import asyncio import json import logging import re import shutil from services.cache import smart_cache logger = logging.getLogger(__name__) # SMART attribute IDs of interest ATTR_REALLOCATED = 5 ATTR_POWER_ON_HOURS = 9 ATTR_TEMPERATURE = 194 ATTR_TEMPERATURE_ALT = 190 ATTR_PENDING = 197 ATTR_UNCORRECTABLE = 198 ATTR_WEAR_LEVELING = 177 # SSD wear leveling def smartctl_available() -> bool: return shutil.which("smartctl") is not None def sg_ses_available() -> bool: return shutil.which("sg_ses") is not None async def get_smart_data(device: str) -> dict: """Run smartctl -a -j against a device, with caching.""" # Sanitize device name: only allow alphanumeric and hyphens if not re.match(r"^[a-zA-Z0-9\-]+$", device): raise ValueError(f"Invalid device name: {device}") cached = smart_cache.get(device) if cached is not None: return cached result = await _run_smartctl(device) smart_cache.set(device, result) return result async def _run_smartctl(device: str) -> dict: """Execute smartctl and parse JSON output.""" dev_path = f"/dev/{device}" try: proc = await asyncio.create_subprocess_exec( "smartctl", "-a", "-j", dev_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() except FileNotFoundError: return {"error": "smartctl not found", "smart_supported": False} # smartctl returns non-zero for many non-fatal reasons (bit flags). # The JSON output is still valid even with non-zero exit codes. if not stdout: return { "error": f"smartctl produced no output (exit code {proc.returncode})", "smart_supported": False, } try: data = json.loads(stdout) except json.JSONDecodeError: return {"error": "Failed to parse smartctl JSON output", "smart_supported": False} return _parse_smart_json(device, data) def _parse_smart_json(device: str, data: dict) -> dict: """Extract relevant fields from smartctl JSON output.""" result: dict = {"device": device, "smart_supported": True} # Identity — ATA vs SCSI field names result["model"] = data.get("model_name") or data.get("scsi_model_name") result["serial"] = data.get("serial_number") result["firmware"] = data.get("firmware_version") or data.get("scsi_revision") result["capacity_bytes"] = data.get("user_capacity", {}).get("bytes") # WWN — ATA uses structured wwn{}, SCSI uses logical_unit_id string wwn_data = data.get("wwn") if wwn_data: naa = wwn_data.get("naa", 0) oui = wwn_data.get("oui", 0) wwn_id = wwn_data.get("id", 0) result["wwn"] = f"{naa:x}{oui:06x}{wwn_id:09x}" elif data.get("logical_unit_id"): result["wwn"] = data["logical_unit_id"] # SMART health smart_status = data.get("smart_status", {}) result["smart_healthy"] = smart_status.get("passed") # Temperature temp = data.get("temperature", {}) result["temperature_c"] = temp.get("current") # Power-on hours poh = data.get("power_on_time", {}) result["power_on_hours"] = poh.get("hours") # SMART attributes (ATA) attrs = data.get("ata_smart_attributes", {}).get("table", []) result["smart_attributes"] = attrs result["reallocated_sectors"] = _get_attr_raw(attrs, ATTR_REALLOCATED) result["pending_sectors"] = _get_attr_raw(attrs, ATTR_PENDING) result["uncorrectable_errors"] = _get_attr_raw(attrs, ATTR_UNCORRECTABLE) result["wear_leveling_percent"] = _get_attr_value(attrs, ATTR_WEAR_LEVELING) # Power-on hours fallback from attributes if result["power_on_hours"] is None: result["power_on_hours"] = _get_attr_raw(attrs, ATTR_POWER_ON_HOURS) # Temperature fallback from attributes if result["temperature_c"] is None: result["temperature_c"] = _get_attr_raw(attrs, ATTR_TEMPERATURE) if result["temperature_c"] is None: result["temperature_c"] = _get_attr_raw(attrs, ATTR_TEMPERATURE_ALT) # NVMe attributes (different structure) nvme_health = data.get("nvme_smart_health_information_log") if nvme_health: result["smart_attributes"] = [{"nvme_health": nvme_health}] if result["temperature_c"] is None: result["temperature_c"] = nvme_health.get("temperature") if result["power_on_hours"] is None: result["power_on_hours"] = nvme_health.get("power_on_hours") if result["wear_leveling_percent"] is None: pct_used = nvme_health.get("percentage_used") if pct_used is not None: result["wear_leveling_percent"] = 100 - pct_used # SAS/SCSI drives use different error counters scsi_errors = data.get("scsi_error_counter_log") if scsi_errors and not attrs: result["smart_attributes"] = [{"scsi_error_log": scsi_errors}] return result def _get_attr_raw(attrs: list[dict], attr_id: int) -> int | None: """Get the raw_value for a SMART attribute by ID.""" for attr in attrs: if attr.get("id") == attr_id: raw = attr.get("raw", {}) return raw.get("value") return None def _get_attr_value(attrs: list[dict], attr_id: int) -> int | None: """Get the normalized value for a SMART attribute by ID.""" for attr in attrs: if attr.get("id") == attr_id: return attr.get("value") return None