import asyncio import logging import os import re from pathlib import Path logger = logging.getLogger(__name__) ENCLOSURE_BASE = Path("/sys/class/enclosure") def _read_sysfs(path: Path) -> str: """Read a sysfs attribute file, return stripped content or empty string.""" try: return path.read_text().strip() except (OSError, IOError): return "" def _find_sg_device(enclosure_path: Path) -> str | None: """Resolve the sg device for an enclosure from its sysfs path.""" # The enclosure sysfs directory has a 'device' symlink. Under that, # there's a scsi_generic directory containing the sg device name. sg_dir = enclosure_path / "device" / "scsi_generic" if sg_dir.is_dir(): entries = list(sg_dir.iterdir()) if entries: return f"/dev/{entries[0].name}" return None def discover_enclosures() -> list[dict]: """Walk /sys/class/enclosure/ to discover SES enclosures.""" if not ENCLOSURE_BASE.is_dir(): logger.warning("No enclosure sysfs directory found at %s", ENCLOSURE_BASE) return [] enclosures = [] for enc_dir in sorted(ENCLOSURE_BASE.iterdir()): if not enc_dir.is_dir(): continue enc_id = enc_dir.name device_dir = enc_dir / "device" vendor = _read_sysfs(device_dir / "vendor") model = _read_sysfs(device_dir / "model") revision = _read_sysfs(device_dir / "rev") sg_device = _find_sg_device(enc_dir) slots = list_slots(enc_id) total = len(slots) populated = sum(1 for s in slots if s["populated"]) enclosures.append({ "id": enc_id, "sg_device": sg_device, "vendor": vendor, "model": model, "revision": revision, "total_slots": total, "populated_slots": populated, }) return enclosures def list_slots(enclosure_id: str) -> list[dict]: """Enumerate drive slots for an enclosure via sysfs.""" enc_dir = ENCLOSURE_BASE / enclosure_id if not enc_dir.is_dir(): return [] slots = [] for entry in sorted(enc_dir.iterdir()): if not entry.is_dir(): continue # Determine if this is a drive slot element. # Some enclosures use named dirs ("Slot 00", "Disk 1", "ArrayDevice00"), # others use bare numeric dirs ("0", "1", "2") with a "type" file. slot_num = _parse_slot_number(entry) if slot_num is None: continue # Check if a block device is linked in this slot block_dir = entry / "device" / "block" device = None populated = False if block_dir.is_dir(): devs = list(block_dir.iterdir()) if devs: device = devs[0].name populated = True else: # Also check the 'status' file — "not installed" means empty status = _read_sysfs(entry / "status") if status and status not in ("not installed", ""): populated = True slots.append({ "slot": slot_num, "populated": populated, "device": device, }) slots.sort(key=lambda s: s["slot"]) return slots def _parse_slot_number(entry: Path) -> int | None: """Extract the slot number from a sysfs slot directory. Handles multiple naming conventions: - Bare numeric dirs ("0", "1") with type=device and a slot file - Named dirs ("Slot 00", "Slot00", "Disk 1", "ArrayDevice00") """ name = entry.name # Bare numeric directory — check the type file to confirm it's a device slot if name.isdigit(): entry_type = _read_sysfs(entry / "type") if entry_type not in ("device", "disk", "array device"): return None # Prefer the 'slot' file for the actual slot number slot_val = _read_sysfs(entry / "slot") if slot_val.isdigit(): return int(slot_val) return int(name) # Named directory prefixes for prefix in ("Slot ", "Slot", "Disk ", "Disk", "ArrayDevice", "SLOT "): if name.startswith(prefix): num_str = name[len(prefix):].strip() try: return int(num_str) except ValueError: return None return None async def get_enclosure_status(sg_device: str) -> dict | None: """Run sg_ses --page=0x02 and parse enclosure health data.""" try: proc = await asyncio.create_subprocess_exec( "sg_ses", "--page=0x02", sg_device, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: logger.warning("sg_ses failed for %s: %s", sg_device, stderr.decode().strip()) return None return _parse_ses_page02(stdout.decode(errors="replace")) except FileNotFoundError: logger.warning("sg_ses not found") return None except Exception as e: logger.warning("sg_ses error for %s: %s", sg_device, e) return None def _parse_ses_page02(text: str) -> dict: """Parse sg_ses --page=0x02 text output into structured health data.""" result = { "overall_status": "OK", "psus": [], "fans": [], "temps": [], "voltages": [], } # Parse header line for overall status: # INVOP=0, INFO=0, NON-CRIT=0, CRIT=1, UNRECOV=0 header_match = re.search( r"INVOP=\d+,\s*INFO=\d+,\s*NON-CRIT=(\d+),\s*CRIT=(\d+),\s*UNRECOV=(\d+)", text, ) if header_match: non_crit = int(header_match.group(1)) crit = int(header_match.group(2)) unrecov = int(header_match.group(3)) if crit > 0 or unrecov > 0: result["overall_status"] = "CRITICAL" elif non_crit > 0: result["overall_status"] = "WARNING" # Split into element type sections. # Each section starts with "Element type: " sections = re.split(r"(?=\s*Element type:)", text) for section in sections: type_match = re.match(r"\s*Element type:\s*(.+)", section) if not type_match: continue element_type = type_match.group(1).strip().rstrip(",").lower() # Find individual element blocks (skip "Overall descriptor") elements = re.split(r"(?=\s*Element \d+ descriptor:)", section) for elem_text in elements: desc_match = re.match(r"\s*Element (\d+) descriptor:", elem_text) if not desc_match: continue idx = int(desc_match.group(1)) # Extract status line status_match = re.search(r"status:\s*(.+?)(?:,|\n|$)", elem_text, re.IGNORECASE) status = status_match.group(1).strip() if status_match else "Unknown" if status.lower() == "not installed": continue if "power supply" in element_type: fail = "Fail=1" in elem_text ac_fail = "AC fail=1" in elem_text dc_fail = "DC fail=1" in elem_text result["psus"].append({ "index": idx, "status": status, "fail": fail, "ac_fail": ac_fail, "dc_fail": dc_fail, }) elif "cooling" in element_type or "fan" in element_type: fail = "Fail=1" in elem_text rpm_match = re.search(r"Actual speed[=:]\s*(\d+)\s*rpm", elem_text, re.IGNORECASE) rpm = int(rpm_match.group(1)) if rpm_match else None result["fans"].append({ "index": idx, "status": status, "rpm": rpm, "fail": fail, }) elif "temperature" in element_type: temp_match = re.search(r"Temperature=\s*([\d.]+)\s*C", elem_text) temp = float(temp_match.group(1)) if temp_match else None result["temps"].append({ "index": idx, "status": status, "temperature_c": temp, }) elif "voltage" in element_type: volt_match = re.search(r"Voltage:\s*([\d.]+)\s*V", elem_text, re.IGNORECASE) if not volt_match: volt_match = re.search(r"([\d.]+)\s*V", elem_text) voltage = float(volt_match.group(1)) if volt_match else None result["voltages"].append({ "index": idx, "status": status, "voltage": voltage, }) return result