import asyncio import json import logging from services.enclosure import discover_enclosures, list_slots from services.smart import get_smart_data, scan_megaraid_drives from services.zfs import get_zfs_pool_map logger = logging.getLogger(__name__) async def get_host_drives() -> list[dict]: """Discover non-enclosure block devices and return SMART data for each.""" # Get all block devices via lsblk try: proc = await asyncio.create_subprocess_exec( "lsblk", "-d", "-o", "NAME,SIZE,TYPE,MODEL,ROTA,TRAN", "-J", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() lsblk_data = json.loads(stdout) except (FileNotFoundError, json.JSONDecodeError) as e: logger.warning("lsblk failed: %s", e) return [] # Collect all enclosure-mapped devices enclosure_devices: set[str] = set() for enc in discover_enclosures(): for slot in list_slots(enc["id"]): if slot["device"]: enclosure_devices.add(slot["device"]) # Filter to host-only disks host_devices: list[dict] = [] for dev in lsblk_data.get("blockdevices", []): name = dev.get("name", "") dev_type = dev.get("type", "") # Skip non-disk types and enclosure drives if dev_type != "disk": continue if name in enclosure_devices: continue # Determine drive type from transport/model tran = (dev.get("tran") or "").lower() model = (dev.get("model") or "").lower() rota = dev.get("rota") if tran == "nvme" or name.startswith("nvme"): drive_type = "nvme" elif "perc" in model or "raid" in model or "megaraid" in model: drive_type = "raid" elif rota is False or rota == "0" or rota == 0: drive_type = "ssd" else: drive_type = "hdd" host_devices.append({"name": name, "drive_type": drive_type}) # Fetch SMART + ZFS data concurrently pool_map = await get_zfs_pool_map() smart_tasks = [get_smart_data(d["name"]) for d in host_devices] smart_results = await asyncio.gather(*smart_tasks, return_exceptions=True) results: list[dict] = [] for dev_info, smart_result in zip(host_devices, smart_results): name = dev_info["name"] if isinstance(smart_result, Exception): logger.warning("SMART query failed for host drive %s: %s", name, smart_result) smart = {"device": name, "smart_supported": False} else: smart, _ = smart_result # Compute health_status (same logic as overview.py) healthy = smart.get("smart_healthy") realloc = smart.get("reallocated_sectors") or 0 pending = smart.get("pending_sectors") or 0 unc = smart.get("uncorrectable_errors") or 0 if healthy is False: health_status = "error" elif realloc > 0 or pending > 0 or unc > 0 or (healthy is None and smart.get("smart_supported", True)): health_status = "warning" else: health_status = "healthy" zfs_info = pool_map.get(name, {}) results.append({ "device": name, "drive_type": dev_info["drive_type"], "model": smart.get("model"), "serial": smart.get("serial"), "wwn": smart.get("wwn"), "firmware": smart.get("firmware"), "capacity_bytes": smart.get("capacity_bytes"), "smart_healthy": healthy, "smart_supported": smart.get("smart_supported", True), "temperature_c": smart.get("temperature_c"), "power_on_hours": smart.get("power_on_hours"), "reallocated_sectors": smart.get("reallocated_sectors"), "pending_sectors": smart.get("pending_sectors"), "uncorrectable_errors": smart.get("uncorrectable_errors"), "zfs_pool": zfs_info.get("pool"), "zfs_vdev": zfs_info.get("vdev"), "zfs_state": zfs_info.get("state"), "health_status": health_status, "physical_drives": [], }) # Discover physical drives behind RAID controllers has_raid = any(r["drive_type"] == "raid" and not r["smart_supported"] for r in results) if has_raid: megaraid_drives = await scan_megaraid_drives() for pd in megaraid_drives: pd_healthy = pd.get("smart_healthy") pd_realloc = pd.get("reallocated_sectors") or 0 pd_pending = pd.get("pending_sectors") or 0 pd_unc = pd.get("uncorrectable_errors") or 0 if pd_healthy is False: pd["health_status"] = "error" elif pd_realloc > 0 or pd_pending > 0 or pd_unc > 0: pd["health_status"] = "warning" else: pd["health_status"] = "healthy" pd["drive_type"] = "physical" pd["physical_drives"] = [] # Attach to the first RAID host drive for r in results: if r["drive_type"] == "raid" and not r["smart_supported"]: r["physical_drives"] = megaraid_drives break return results