Initial commit: FastAPI JBOD monitor backend
This commit is contained in:
118
services/enclosure.py
Normal file
118
services/enclosure.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import os
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ENCLOSURE_BASE = Path("/sys/class/enclosure")
|
||||
|
||||
|
||||
def _read_sysfs(path: Path) -> str:
|
||||
"""Read a sysfs attribute file, return stripped content or empty string."""
|
||||
try:
|
||||
return path.read_text().strip()
|
||||
except (OSError, IOError):
|
||||
return ""
|
||||
|
||||
|
||||
def _find_sg_device(enclosure_path: Path) -> str | None:
|
||||
"""Resolve the sg device for an enclosure from its sysfs path."""
|
||||
# The enclosure sysfs directory has a 'device' symlink. Under that,
|
||||
# there's a scsi_generic directory containing the sg device name.
|
||||
sg_dir = enclosure_path / "device" / "scsi_generic"
|
||||
if sg_dir.is_dir():
|
||||
entries = list(sg_dir.iterdir())
|
||||
if entries:
|
||||
return f"/dev/{entries[0].name}"
|
||||
return None
|
||||
|
||||
|
||||
def discover_enclosures() -> list[dict]:
|
||||
"""Walk /sys/class/enclosure/ to discover SES enclosures."""
|
||||
if not ENCLOSURE_BASE.is_dir():
|
||||
logger.warning("No enclosure sysfs directory found at %s", ENCLOSURE_BASE)
|
||||
return []
|
||||
|
||||
enclosures = []
|
||||
for enc_dir in sorted(ENCLOSURE_BASE.iterdir()):
|
||||
if not enc_dir.is_dir():
|
||||
continue
|
||||
|
||||
enc_id = enc_dir.name
|
||||
device_dir = enc_dir / "device"
|
||||
|
||||
vendor = _read_sysfs(device_dir / "vendor")
|
||||
model = _read_sysfs(device_dir / "model")
|
||||
revision = _read_sysfs(device_dir / "rev")
|
||||
sg_device = _find_sg_device(enc_dir)
|
||||
|
||||
slots = list_slots(enc_id)
|
||||
total = len(slots)
|
||||
populated = sum(1 for s in slots if s["populated"])
|
||||
|
||||
enclosures.append({
|
||||
"id": enc_id,
|
||||
"sg_device": sg_device,
|
||||
"vendor": vendor,
|
||||
"model": model,
|
||||
"revision": revision,
|
||||
"total_slots": total,
|
||||
"populated_slots": populated,
|
||||
})
|
||||
|
||||
return enclosures
|
||||
|
||||
|
||||
def list_slots(enclosure_id: str) -> list[dict]:
|
||||
"""Enumerate drive slots for an enclosure via sysfs."""
|
||||
enc_dir = ENCLOSURE_BASE / enclosure_id
|
||||
if not enc_dir.is_dir():
|
||||
return []
|
||||
|
||||
slots = []
|
||||
for entry in sorted(enc_dir.iterdir()):
|
||||
# Slot entries are directories like "Slot 00", "Slot 01", etc.
|
||||
# Some enclosures use "Disk" or "ArrayDevice" prefixes.
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
name = entry.name
|
||||
slot_num = _parse_slot_number(name)
|
||||
if slot_num is None:
|
||||
continue
|
||||
|
||||
# Check if a block device is linked in this slot
|
||||
block_dir = entry / "device" / "block"
|
||||
device = None
|
||||
populated = False
|
||||
|
||||
if block_dir.is_dir():
|
||||
devs = list(block_dir.iterdir())
|
||||
if devs:
|
||||
device = devs[0].name
|
||||
populated = True
|
||||
else:
|
||||
# Also check the 'status' file — "not installed" means empty
|
||||
status = _read_sysfs(entry / "status")
|
||||
if status and status != "not installed":
|
||||
populated = True
|
||||
|
||||
slots.append({
|
||||
"slot": slot_num,
|
||||
"populated": populated,
|
||||
"device": device,
|
||||
})
|
||||
|
||||
return slots
|
||||
|
||||
|
||||
def _parse_slot_number(name: str) -> int | None:
|
||||
"""Extract the slot number from a sysfs slot directory name."""
|
||||
# Handles "Slot 00", "Slot00", "Disk 1", "ArrayDevice00", etc.
|
||||
for prefix in ("Slot ", "Slot", "Disk ", "Disk", "ArrayDevice", "SLOT "):
|
||||
if name.startswith(prefix):
|
||||
num_str = name[len(prefix):].strip()
|
||||
try:
|
||||
return int(num_str)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
Reference in New Issue
Block a user