import asyncio import os import logging logger = logging.getLogger(__name__) # Allow overriding the zpool binary path via env (for bind-mounted host tools) ZPOOL_BIN = os.environ.get("ZPOOL_BIN", "zpool") async def get_zfs_pool_map() -> dict[str, str]: """Return a dict mapping device names to ZFS pool names. e.g. {"sda": "tank", "sdb": "tank", "sdc": "fast"} """ pool_map = {} try: # When running in a container with pid:host, use nsenter to run # zpool in the host mount namespace so it finds its own libs. use_nsenter = os.environ.get("ZFS_USE_NSENTER", "").lower() in ("1", "true") if use_nsenter: cmd = ["nsenter", "-t", "1", "-m", "--", "zpool", "status", "-P"] else: cmd = [ZPOOL_BIN, "status", "-P"] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() if proc.returncode != 0: return pool_map current_pool = None for line in stdout.decode().splitlines(): stripped = line.strip() if stripped.startswith("pool:"): current_pool = stripped.split(":", 1)[1].strip() elif current_pool and "/dev/" in stripped: parts = stripped.split() dev_path = parts[0] try: real = os.path.realpath(dev_path) dev_name = os.path.basename(real) # Strip partition numbers (sda1 -> sda) dev_name = dev_name.rstrip("0123456789") pool_map[dev_name] = current_pool except Exception: pass except FileNotFoundError: logger.debug("zpool not available") return pool_map