139 lines
4.2 KiB
Python
139 lines
4.2 KiB
Python
import os
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ENCLOSURE_BASE = Path("/sys/class/enclosure")
|
|
|
|
|
|
def _read_sysfs(path: Path) -> str:
|
|
"""Read a sysfs attribute file, return stripped content or empty string."""
|
|
try:
|
|
return path.read_text().strip()
|
|
except (OSError, IOError):
|
|
return ""
|
|
|
|
|
|
def _find_sg_device(enclosure_path: Path) -> str | None:
|
|
"""Resolve the sg device for an enclosure from its sysfs path."""
|
|
# The enclosure sysfs directory has a 'device' symlink. Under that,
|
|
# there's a scsi_generic directory containing the sg device name.
|
|
sg_dir = enclosure_path / "device" / "scsi_generic"
|
|
if sg_dir.is_dir():
|
|
entries = list(sg_dir.iterdir())
|
|
if entries:
|
|
return f"/dev/{entries[0].name}"
|
|
return None
|
|
|
|
|
|
def discover_enclosures() -> list[dict]:
|
|
"""Walk /sys/class/enclosure/ to discover SES enclosures."""
|
|
if not ENCLOSURE_BASE.is_dir():
|
|
logger.warning("No enclosure sysfs directory found at %s", ENCLOSURE_BASE)
|
|
return []
|
|
|
|
enclosures = []
|
|
for enc_dir in sorted(ENCLOSURE_BASE.iterdir()):
|
|
if not enc_dir.is_dir():
|
|
continue
|
|
|
|
enc_id = enc_dir.name
|
|
device_dir = enc_dir / "device"
|
|
|
|
vendor = _read_sysfs(device_dir / "vendor")
|
|
model = _read_sysfs(device_dir / "model")
|
|
revision = _read_sysfs(device_dir / "rev")
|
|
sg_device = _find_sg_device(enc_dir)
|
|
|
|
slots = list_slots(enc_id)
|
|
total = len(slots)
|
|
populated = sum(1 for s in slots if s["populated"])
|
|
|
|
enclosures.append({
|
|
"id": enc_id,
|
|
"sg_device": sg_device,
|
|
"vendor": vendor,
|
|
"model": model,
|
|
"revision": revision,
|
|
"total_slots": total,
|
|
"populated_slots": populated,
|
|
})
|
|
|
|
return enclosures
|
|
|
|
|
|
def list_slots(enclosure_id: str) -> list[dict]:
|
|
"""Enumerate drive slots for an enclosure via sysfs."""
|
|
enc_dir = ENCLOSURE_BASE / enclosure_id
|
|
if not enc_dir.is_dir():
|
|
return []
|
|
|
|
slots = []
|
|
for entry in sorted(enc_dir.iterdir()):
|
|
if not entry.is_dir():
|
|
continue
|
|
|
|
# Determine if this is a drive slot element.
|
|
# Some enclosures use named dirs ("Slot 00", "Disk 1", "ArrayDevice00"),
|
|
# others use bare numeric dirs ("0", "1", "2") with a "type" file.
|
|
slot_num = _parse_slot_number(entry)
|
|
if slot_num is None:
|
|
continue
|
|
|
|
# Check if a block device is linked in this slot
|
|
block_dir = entry / "device" / "block"
|
|
device = None
|
|
populated = False
|
|
|
|
if block_dir.is_dir():
|
|
devs = list(block_dir.iterdir())
|
|
if devs:
|
|
device = devs[0].name
|
|
populated = True
|
|
else:
|
|
# Also check the 'status' file — "not installed" means empty
|
|
status = _read_sysfs(entry / "status")
|
|
if status and status not in ("not installed", ""):
|
|
populated = True
|
|
|
|
slots.append({
|
|
"slot": slot_num,
|
|
"populated": populated,
|
|
"device": device,
|
|
})
|
|
|
|
slots.sort(key=lambda s: s["slot"])
|
|
return slots
|
|
|
|
|
|
def _parse_slot_number(entry: Path) -> int | None:
|
|
"""Extract the slot number from a sysfs slot directory.
|
|
|
|
Handles multiple naming conventions:
|
|
- Bare numeric dirs ("0", "1") with type=device and a slot file
|
|
- Named dirs ("Slot 00", "Slot00", "Disk 1", "ArrayDevice00")
|
|
"""
|
|
name = entry.name
|
|
|
|
# Bare numeric directory — check the type file to confirm it's a device slot
|
|
if name.isdigit():
|
|
entry_type = _read_sysfs(entry / "type")
|
|
if entry_type not in ("device", "disk", "array device"):
|
|
return None
|
|
# Prefer the 'slot' file for the actual slot number
|
|
slot_val = _read_sysfs(entry / "slot")
|
|
if slot_val.isdigit():
|
|
return int(slot_val)
|
|
return int(name)
|
|
|
|
# Named directory prefixes
|
|
for prefix in ("Slot ", "Slot", "Disk ", "Disk", "ArrayDevice", "SLOT "):
|
|
if name.startswith(prefix):
|
|
num_str = name[len(prefix):].strip()
|
|
try:
|
|
return int(num_str)
|
|
except ValueError:
|
|
return None
|
|
return None
|