Files
jbod-monitor/services/smart.py

160 lines
5.2 KiB
Python

import asyncio
import json
import logging
import re
import shutil
from services.cache import smart_cache
logger = logging.getLogger(__name__)
# SMART attribute IDs of interest
ATTR_REALLOCATED = 5
ATTR_POWER_ON_HOURS = 9
ATTR_TEMPERATURE = 194
ATTR_TEMPERATURE_ALT = 190
ATTR_PENDING = 197
ATTR_UNCORRECTABLE = 198
ATTR_WEAR_LEVELING = 177 # SSD wear leveling
def smartctl_available() -> bool:
return shutil.which("smartctl") is not None
def sg_ses_available() -> bool:
return shutil.which("sg_ses") is not None
async def get_smart_data(device: str) -> dict:
"""Run smartctl -a -j against a device, with caching."""
# Sanitize device name: only allow alphanumeric and hyphens
if not re.match(r"^[a-zA-Z0-9\-]+$", device):
raise ValueError(f"Invalid device name: {device}")
cached = smart_cache.get(device)
if cached is not None:
return cached
result = await _run_smartctl(device)
smart_cache.set(device, result)
return result
async def _run_smartctl(device: str) -> dict:
"""Execute smartctl and parse JSON output."""
dev_path = f"/dev/{device}"
try:
proc = await asyncio.create_subprocess_exec(
"smartctl", "-a", "-j", dev_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
except FileNotFoundError:
return {"error": "smartctl not found", "smart_supported": False}
# smartctl returns non-zero for many non-fatal reasons (bit flags).
# The JSON output is still valid even with non-zero exit codes.
if not stdout:
return {
"error": f"smartctl produced no output (exit code {proc.returncode})",
"smart_supported": False,
}
try:
data = json.loads(stdout)
except json.JSONDecodeError:
return {"error": "Failed to parse smartctl JSON output", "smart_supported": False}
return _parse_smart_json(device, data)
def _parse_smart_json(device: str, data: dict) -> dict:
"""Extract relevant fields from smartctl JSON output."""
result: dict = {"device": device, "smart_supported": True}
# Identity
result["model"] = data.get("model_name")
result["serial"] = data.get("serial_number")
result["firmware"] = data.get("firmware_version")
result["capacity_bytes"] = data.get("user_capacity", {}).get("bytes")
# WWN
wwn_data = data.get("wwn")
if wwn_data:
# Reconstruct WWN string from components
naa = wwn_data.get("naa", 0)
oui = wwn_data.get("oui", 0)
wwn_id = wwn_data.get("id", 0)
result["wwn"] = f"{naa:x}{oui:06x}{wwn_id:09x}"
# SMART health
smart_status = data.get("smart_status", {})
result["smart_healthy"] = smart_status.get("passed")
# Temperature
temp = data.get("temperature", {})
result["temperature_c"] = temp.get("current")
# Power-on hours
poh = data.get("power_on_time", {})
result["power_on_hours"] = poh.get("hours")
# SMART attributes (ATA)
attrs = data.get("ata_smart_attributes", {}).get("table", [])
result["smart_attributes"] = attrs
result["reallocated_sectors"] = _get_attr_raw(attrs, ATTR_REALLOCATED)
result["pending_sectors"] = _get_attr_raw(attrs, ATTR_PENDING)
result["uncorrectable_errors"] = _get_attr_raw(attrs, ATTR_UNCORRECTABLE)
result["wear_leveling_percent"] = _get_attr_value(attrs, ATTR_WEAR_LEVELING)
# Power-on hours fallback from attributes
if result["power_on_hours"] is None:
result["power_on_hours"] = _get_attr_raw(attrs, ATTR_POWER_ON_HOURS)
# Temperature fallback from attributes
if result["temperature_c"] is None:
result["temperature_c"] = _get_attr_raw(attrs, ATTR_TEMPERATURE)
if result["temperature_c"] is None:
result["temperature_c"] = _get_attr_raw(attrs, ATTR_TEMPERATURE_ALT)
# NVMe attributes (different structure)
nvme_health = data.get("nvme_smart_health_information_log")
if nvme_health:
result["smart_attributes"] = [{"nvme_health": nvme_health}]
if result["temperature_c"] is None:
result["temperature_c"] = nvme_health.get("temperature")
if result["power_on_hours"] is None:
result["power_on_hours"] = nvme_health.get("power_on_hours")
if result["wear_leveling_percent"] is None:
pct_used = nvme_health.get("percentage_used")
if pct_used is not None:
result["wear_leveling_percent"] = 100 - pct_used
# SAS/SCSI drives use different error counters
scsi_errors = data.get("scsi_error_counter_log")
if scsi_errors and not attrs:
result["smart_attributes"] = [{"scsi_error_log": scsi_errors}]
return result
def _get_attr_raw(attrs: list[dict], attr_id: int) -> int | None:
"""Get the raw_value for a SMART attribute by ID."""
for attr in attrs:
if attr.get("id") == attr_id:
raw = attr.get("raw", {})
return raw.get("value")
return None
def _get_attr_value(attrs: list[dict], attr_id: int) -> int | None:
"""Get the normalized value for a SMART attribute by ID."""
for attr in attrs:
if attr.get("id") == attr_id:
return attr.get("value")
return None