import asyncio import os import logging import re from pathlib import Path logger = logging.getLogger(__name__) # Allow overriding the zpool binary path via env (for bind-mounted host tools) ZPOOL_BIN = os.environ.get("ZPOOL_BIN", "zpool") async def get_zfs_pool_map() -> dict[str, dict]: """Return a dict mapping device names to ZFS pool and vdev info. e.g. {"sda": {"pool": "tank", "vdev": "raidz2-0"}, "sdb": {"pool": "fast", "vdev": "mirror-0"}} """ pool_map = {} try: # When running in a container with pid:host, use nsenter to run # zpool in the host mount namespace so it finds its own libs. use_nsenter = os.environ.get("ZFS_USE_NSENTER", "").lower() in ("1", "true") if use_nsenter: cmd = ["nsenter", "-t", "1", "-m", "--", "zpool", "status", "-P"] else: cmd = [ZPOOL_BIN, "status", "-P"] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() if proc.returncode != 0: return pool_map current_pool = None current_vdev = None in_config = False for line in stdout.decode().splitlines(): stripped = line.strip() if stripped.startswith("pool:"): current_pool = stripped.split(":", 1)[1].strip() current_vdev = None in_config = False continue if stripped.startswith("NAME") and "STATE" in stripped: in_config = True continue if stripped.startswith("errors:") or stripped == "": if stripped.startswith("errors:"): in_config = False continue if not in_config or not current_pool: continue # Indentation: 1 tab = pool name, 1 tab + 2 spaces = vdev, # 1 tab + 4 spaces = device. Count chars before content. leading = len(line) - len(line.lstrip()) if "/dev/" not in stripped and leading == 3: # This is a vdev line (mirror-0, raidz2-0, etc.) current_vdev = stripped.split()[0] elif "/dev/" in stripped: parts = stripped.split() dev_path = parts[0] try: dev_state = parts[1] if len(parts) > 1 else None info = { "pool": current_pool, "vdev": current_vdev or current_pool, "state": dev_state, } # Resolve symlink and map the device real = os.path.realpath(dev_path) dev_name = os.path.basename(real) # Map the resolved device (strip partition suffix) base_dev = _strip_partition(dev_name) pool_map[base_dev] = info # For device-mapper (multipath), also map the # underlying slave sd devices. if base_dev.startswith("dm-"): for slave in _resolve_dm_slaves(base_dev): pool_map[slave] = info except Exception: pass except FileNotFoundError: logger.debug("zpool not available") return pool_map def _strip_partition(dev_name: str) -> str: """Strip partition suffix from a device name. sda1 -> sda, nvme0n1p1 -> nvme0n1, dm-14 stays dm-14 (it's a separate dm device for the partition, resolve via slaves). """ # dm devices: partition dm devices are separate dm-N entries, # resolve via /sys/block/dm-N/slaves to find parent dm device if dev_name.startswith("dm-"): parent = _get_dm_parent(dev_name) return parent if parent else dev_name # NVMe: nvme0n1p1 -> nvme0n1 m = re.match(r'^(nvme\d+n\d+)p\d+$', dev_name) if m: return m.group(1) # Standard: sda1 -> sda return re.sub(r'\d+$', '', dev_name) def _get_dm_parent(dm_name: str) -> str | None: """For a dm partition device, find the parent dm device via slaves.""" slave_dir = Path(f"/sys/block/{dm_name}/slaves") if slave_dir.is_dir(): slaves = list(slave_dir.iterdir()) if len(slaves) == 1 and slaves[0].name.startswith("dm-"): return slaves[0].name return None def _resolve_dm_slaves(dm_name: str) -> list[str]: """Get underlying sd device names for a device-mapper device.""" slaves = [] slave_dir = Path(f"/sys/block/{dm_name}/slaves") if slave_dir.is_dir(): for s in slave_dir.iterdir(): name = s.name if name.startswith("dm-"): # Nested dm — recurse slaves.extend(_resolve_dm_slaves(name)) else: slaves.append(name) return slaves