Files
jbod-monitor/services/enclosure.py

139 lines
4.2 KiB
Python

import os
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
ENCLOSURE_BASE = Path("/sys/class/enclosure")
def _read_sysfs(path: Path) -> str:
"""Read a sysfs attribute file, return stripped content or empty string."""
try:
return path.read_text().strip()
except (OSError, IOError):
return ""
def _find_sg_device(enclosure_path: Path) -> str | None:
"""Resolve the sg device for an enclosure from its sysfs path."""
# The enclosure sysfs directory has a 'device' symlink. Under that,
# there's a scsi_generic directory containing the sg device name.
sg_dir = enclosure_path / "device" / "scsi_generic"
if sg_dir.is_dir():
entries = list(sg_dir.iterdir())
if entries:
return f"/dev/{entries[0].name}"
return None
def discover_enclosures() -> list[dict]:
"""Walk /sys/class/enclosure/ to discover SES enclosures."""
if not ENCLOSURE_BASE.is_dir():
logger.warning("No enclosure sysfs directory found at %s", ENCLOSURE_BASE)
return []
enclosures = []
for enc_dir in sorted(ENCLOSURE_BASE.iterdir()):
if not enc_dir.is_dir():
continue
enc_id = enc_dir.name
device_dir = enc_dir / "device"
vendor = _read_sysfs(device_dir / "vendor")
model = _read_sysfs(device_dir / "model")
revision = _read_sysfs(device_dir / "rev")
sg_device = _find_sg_device(enc_dir)
slots = list_slots(enc_id)
total = len(slots)
populated = sum(1 for s in slots if s["populated"])
enclosures.append({
"id": enc_id,
"sg_device": sg_device,
"vendor": vendor,
"model": model,
"revision": revision,
"total_slots": total,
"populated_slots": populated,
})
return enclosures
def list_slots(enclosure_id: str) -> list[dict]:
"""Enumerate drive slots for an enclosure via sysfs."""
enc_dir = ENCLOSURE_BASE / enclosure_id
if not enc_dir.is_dir():
return []
slots = []
for entry in sorted(enc_dir.iterdir()):
if not entry.is_dir():
continue
# Determine if this is a drive slot element.
# Some enclosures use named dirs ("Slot 00", "Disk 1", "ArrayDevice00"),
# others use bare numeric dirs ("0", "1", "2") with a "type" file.
slot_num = _parse_slot_number(entry)
if slot_num is None:
continue
# Check if a block device is linked in this slot
block_dir = entry / "device" / "block"
device = None
populated = False
if block_dir.is_dir():
devs = list(block_dir.iterdir())
if devs:
device = devs[0].name
populated = True
else:
# Also check the 'status' file — "not installed" means empty
status = _read_sysfs(entry / "status")
if status and status not in ("not installed", ""):
populated = True
slots.append({
"slot": slot_num,
"populated": populated,
"device": device,
})
slots.sort(key=lambda s: s["slot"])
return slots
def _parse_slot_number(entry: Path) -> int | None:
"""Extract the slot number from a sysfs slot directory.
Handles multiple naming conventions:
- Bare numeric dirs ("0", "1") with type=device and a slot file
- Named dirs ("Slot 00", "Slot00", "Disk 1", "ArrayDevice00")
"""
name = entry.name
# Bare numeric directory — check the type file to confirm it's a device slot
if name.isdigit():
entry_type = _read_sysfs(entry / "type")
if entry_type not in ("device", "disk", "array device"):
return None
# Prefer the 'slot' file for the actual slot number
slot_val = _read_sysfs(entry / "slot")
if slot_val.isdigit():
return int(slot_val)
return int(name)
# Named directory prefixes
for prefix in ("Slot ", "Slot", "Disk ", "Disk", "ArrayDevice", "SLOT "):
if name.startswith(prefix):
num_str = name[len(prefix):].strip()
try:
return int(num_str)
except ValueError:
return None
return None