Replace in-memory TTL cache with Redis
This commit is contained in:
25
build.sh
Executable file
25
build.sh
Executable file
@@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
IMAGE="docker.adamksmith.xyz/jbod-monitor"
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
# Stage and commit all changes
|
||||
git add -A
|
||||
if git diff --cached --quiet; then
|
||||
echo "No changes to commit, using HEAD"
|
||||
else
|
||||
git commit -m "${1:-Build and push image}"
|
||||
fi
|
||||
|
||||
SHA=$(git rev-parse --short HEAD)
|
||||
|
||||
echo "Building ${IMAGE}:${SHA}"
|
||||
docker build -t "${IMAGE}:${SHA}" -t "${IMAGE}:latest" .
|
||||
|
||||
echo "Pushing ${IMAGE}:${SHA}"
|
||||
docker push "${IMAGE}:${SHA}"
|
||||
docker push "${IMAGE}:latest"
|
||||
|
||||
echo "Done: ${IMAGE}:${SHA}"
|
||||
@@ -15,3 +15,22 @@ services:
|
||||
- TZ=America/Denver
|
||||
- UVICORN_LOG_LEVEL=info
|
||||
- ZFS_USE_NSENTER=true
|
||||
- REDIS_HOST=127.0.0.1
|
||||
- REDIS_PORT=6379
|
||||
- REDIS_DB=0
|
||||
- SMART_CACHE_TTL=120
|
||||
- SMART_POLL_INTERVAL=90
|
||||
depends_on:
|
||||
- redis
|
||||
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
container_name: jbod-redis
|
||||
restart: unless-stopped
|
||||
network_mode: host
|
||||
volumes:
|
||||
- redis-data:/data
|
||||
command: redis-server --save 60 1 --loglevel warning
|
||||
|
||||
volumes:
|
||||
redis-data:
|
||||
|
||||
111
main.py
111
main.py
@@ -1,15 +1,20 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import FileResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
from models.schemas import HealthCheck
|
||||
from routers import drives, enclosures, leds, overview
|
||||
from services.smart import sg_ses_available, smartctl_available
|
||||
from services.cache import cache_set, close_cache, init_cache, redis_available
|
||||
from services.enclosure import discover_enclosures, list_slots
|
||||
from services.smart import SMART_CACHE_TTL, _run_smartctl, sg_ses_available, smartctl_available
|
||||
from services.zfs import get_zfs_pool_map
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -17,10 +22,96 @@ logging.basicConfig(
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SMART_POLL_INTERVAL = int(os.environ.get("SMART_POLL_INTERVAL", "90"))
|
||||
|
||||
_tool_status: dict[str, bool] = {}
|
||||
_poll_task: asyncio.Task | None = None
|
||||
|
||||
|
||||
async def smart_poll_loop():
|
||||
"""Pre-warm Redis with SMART data for all drives."""
|
||||
await asyncio.sleep(2) # let app finish starting
|
||||
while True:
|
||||
try:
|
||||
# Discover all enclosure devices
|
||||
enclosures_raw = discover_enclosures()
|
||||
devices: set[str] = set()
|
||||
for enc in enclosures_raw:
|
||||
for slot in list_slots(enc["id"]):
|
||||
if slot["device"]:
|
||||
devices.add(slot["device"])
|
||||
|
||||
# Discover host block devices via lsblk
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"lsblk", "-d", "-o", "NAME,TYPE", "-J",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, _ = await proc.communicate()
|
||||
if stdout:
|
||||
for dev in json.loads(stdout).get("blockdevices", []):
|
||||
if dev.get("type") == "disk":
|
||||
name = dev.get("name", "")
|
||||
if name and name not in devices:
|
||||
devices.add(name)
|
||||
except Exception as e:
|
||||
logger.warning("lsblk discovery failed in poller: %s", e)
|
||||
|
||||
# Poll each drive and cache result
|
||||
for device in sorted(devices):
|
||||
try:
|
||||
result = await _run_smartctl(device)
|
||||
await cache_set(f"jbod:smart:{device}", result, SMART_CACHE_TTL)
|
||||
except Exception as e:
|
||||
logger.warning("Poll failed for %s: %s", device, e)
|
||||
|
||||
# Pre-warm ZFS map (bypasses cache by calling directly)
|
||||
await get_zfs_pool_map()
|
||||
|
||||
logger.info("SMART poll complete: %d devices", len(devices))
|
||||
except Exception as e:
|
||||
logger.error("SMART poll loop error: %s", e)
|
||||
await asyncio.sleep(SMART_POLL_INTERVAL)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global _poll_task
|
||||
# Startup
|
||||
_tool_status["smartctl"] = smartctl_available()
|
||||
_tool_status["sg_ses"] = sg_ses_available()
|
||||
|
||||
if not _tool_status["smartctl"]:
|
||||
logger.warning("smartctl not found — install smartmontools for SMART data")
|
||||
if not _tool_status["sg_ses"]:
|
||||
logger.warning("sg_ses not found — install sg3-utils for enclosure SES data")
|
||||
if os.geteuid() != 0:
|
||||
logger.warning("Not running as root — smartctl may fail on some devices")
|
||||
|
||||
await init_cache()
|
||||
_tool_status["redis"] = redis_available()
|
||||
|
||||
if redis_available():
|
||||
_poll_task = asyncio.create_task(smart_poll_loop())
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown
|
||||
if _poll_task is not None:
|
||||
_poll_task.cancel()
|
||||
try:
|
||||
await _poll_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await close_cache()
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="JBOD Monitor",
|
||||
description="Drive health monitoring for JBOD enclosures",
|
||||
version="0.1.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
app.add_middleware(
|
||||
@@ -35,24 +126,10 @@ app.include_router(drives.router)
|
||||
app.include_router(leds.router)
|
||||
app.include_router(overview.router)
|
||||
|
||||
_tool_status: dict[str, bool] = {}
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def check_dependencies():
|
||||
_tool_status["smartctl"] = smartctl_available()
|
||||
_tool_status["sg_ses"] = sg_ses_available()
|
||||
|
||||
if not _tool_status["smartctl"]:
|
||||
logger.warning("smartctl not found — install smartmontools for SMART data")
|
||||
if not _tool_status["sg_ses"]:
|
||||
logger.warning("sg_ses not found — install sg3-utils for enclosure SES data")
|
||||
if os.geteuid() != 0:
|
||||
logger.warning("Not running as root — smartctl may fail on some devices")
|
||||
|
||||
|
||||
@app.get("/api/health", response_model=HealthCheck, tags=["health"])
|
||||
async def health():
|
||||
_tool_status["redis"] = redis_available()
|
||||
return HealthCheck(status="ok", tools=_tool_status)
|
||||
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
fastapi>=0.115.0
|
||||
uvicorn>=0.34.0
|
||||
pydantic>=2.10.0
|
||||
redis>=5.0.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from fastapi import APIRouter, HTTPException, Response
|
||||
|
||||
from models.schemas import DriveDetail
|
||||
from services.smart import get_smart_data
|
||||
@@ -8,10 +8,10 @@ router = APIRouter(prefix="/api/drives", tags=["drives"])
|
||||
|
||||
|
||||
@router.get("/{device}", response_model=DriveDetail)
|
||||
async def get_drive_detail(device: str):
|
||||
async def get_drive_detail(device: str, response: Response):
|
||||
"""Get SMART detail for a specific block device."""
|
||||
try:
|
||||
data = await get_smart_data(device)
|
||||
data, cache_hit = await get_smart_data(device)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
@@ -25,4 +25,6 @@ async def get_drive_detail(device: str):
|
||||
data["zfs_vdev"] = zfs_info["vdev"]
|
||||
data["zfs_state"] = zfs_info.get("state")
|
||||
|
||||
response.headers["X-Cache"] = "HIT" if cache_hit else "MISS"
|
||||
|
||||
return DriveDetail(**data)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi import APIRouter, Response
|
||||
|
||||
from models.schemas import (
|
||||
DriveHealthSummary,
|
||||
@@ -22,7 +22,7 @@ router = APIRouter(prefix="/api/overview", tags=["overview"])
|
||||
|
||||
|
||||
@router.get("", response_model=Overview)
|
||||
async def get_overview():
|
||||
async def get_overview(response: Response):
|
||||
"""Aggregate view of all enclosures, slots, and drive health."""
|
||||
enclosures_raw = discover_enclosures()
|
||||
pool_map = await get_zfs_pool_map()
|
||||
@@ -53,12 +53,19 @@ async def get_overview():
|
||||
smart_results = await asyncio.gather(*smart_tasks, return_exceptions=True)
|
||||
|
||||
smart_map: dict[str, dict] = {}
|
||||
all_cache_hits = True
|
||||
any_lookups = False
|
||||
for (slot_info, dev), result in zip(populated, smart_results):
|
||||
if isinstance(result, Exception):
|
||||
logger.warning("SMART query failed for %s: %s", dev, result)
|
||||
smart_map[dev] = {"device": dev, "smart_supported": False}
|
||||
all_cache_hits = False
|
||||
else:
|
||||
smart_map[dev] = result
|
||||
data, hit = result
|
||||
smart_map[dev] = data
|
||||
any_lookups = True
|
||||
if not hit:
|
||||
all_cache_hits = False
|
||||
|
||||
slots_out: list[SlotWithDrive] = []
|
||||
for s in slots_raw:
|
||||
@@ -170,6 +177,8 @@ async def get_overview():
|
||||
warnings += 1
|
||||
host_drives_out.append(HostDrive(**hd))
|
||||
|
||||
response.headers["X-Cache"] = "HIT" if (any_lookups and all_cache_hits) else "MISS"
|
||||
|
||||
return Overview(
|
||||
healthy=all_healthy and errors == 0,
|
||||
drive_count=total_drives,
|
||||
|
||||
@@ -1,29 +1,62 @@
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import redis.asyncio as redis
|
||||
|
||||
class TTLCache:
|
||||
"""Simple in-memory TTL cache."""
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, ttl_seconds: int = 60):
|
||||
self._ttl = ttl_seconds
|
||||
self._store: dict[str, tuple[float, Any]] = {}
|
||||
_redis: redis.Redis | None = None
|
||||
|
||||
def get(self, key: str) -> Any | None:
|
||||
entry = self._store.get(key)
|
||||
if entry is None:
|
||||
|
||||
async def init_cache() -> None:
|
||||
"""Create Redis connection from environment variables."""
|
||||
global _redis
|
||||
host = os.environ.get("REDIS_HOST", "localhost")
|
||||
port = int(os.environ.get("REDIS_PORT", "6379"))
|
||||
db = int(os.environ.get("REDIS_DB", "0"))
|
||||
try:
|
||||
_redis = redis.Redis(host=host, port=port, db=db, decode_responses=True)
|
||||
await _redis.ping()
|
||||
logger.info("Redis connected at %s:%d/%d", host, port, db)
|
||||
except Exception as e:
|
||||
logger.warning("Redis connection failed: %s — running without cache", e)
|
||||
_redis = None
|
||||
|
||||
|
||||
async def close_cache() -> None:
|
||||
"""Close Redis connection."""
|
||||
global _redis
|
||||
if _redis is not None:
|
||||
await _redis.aclose()
|
||||
_redis = None
|
||||
|
||||
|
||||
def redis_available() -> bool:
|
||||
"""Return whether Redis connection is live."""
|
||||
return _redis is not None
|
||||
|
||||
|
||||
async def cache_get(key: str) -> Any | None:
|
||||
"""GET key from Redis, return deserialized value or None on miss/error."""
|
||||
if _redis is None:
|
||||
return None
|
||||
try:
|
||||
raw = await _redis.get(key)
|
||||
if raw is None:
|
||||
return None
|
||||
ts, value = entry
|
||||
if time.monotonic() - ts > self._ttl:
|
||||
del self._store[key]
|
||||
return None
|
||||
return value
|
||||
|
||||
def set(self, key: str, value: Any) -> None:
|
||||
self._store[key] = (time.monotonic(), value)
|
||||
|
||||
def clear(self) -> None:
|
||||
self._store.clear()
|
||||
return json.loads(raw)
|
||||
except Exception as e:
|
||||
logger.warning("Redis GET %s failed: %s", key, e)
|
||||
return None
|
||||
|
||||
|
||||
smart_cache = TTLCache(ttl_seconds=60)
|
||||
async def cache_set(key: str, value: Any, ttl: int = 120) -> None:
|
||||
"""SET key in Redis with expiry, silently catches errors."""
|
||||
if _redis is None:
|
||||
return
|
||||
try:
|
||||
await _redis.set(key, json.dumps(value), ex=ttl)
|
||||
except Exception as e:
|
||||
logger.warning("Redis SET %s failed: %s", key, e)
|
||||
|
||||
@@ -65,12 +65,14 @@ async def get_host_drives() -> list[dict]:
|
||||
smart_results = await asyncio.gather(*smart_tasks, return_exceptions=True)
|
||||
|
||||
results: list[dict] = []
|
||||
for dev_info, smart in zip(host_devices, smart_results):
|
||||
for dev_info, smart_result in zip(host_devices, smart_results):
|
||||
name = dev_info["name"]
|
||||
|
||||
if isinstance(smart, Exception):
|
||||
logger.warning("SMART query failed for host drive %s: %s", name, smart)
|
||||
if isinstance(smart_result, Exception):
|
||||
logger.warning("SMART query failed for host drive %s: %s", name, smart_result)
|
||||
smart = {"device": name, "smart_supported": False}
|
||||
else:
|
||||
smart, _ = smart_result
|
||||
|
||||
# Compute health_status (same logic as overview.py)
|
||||
healthy = smart.get("smart_healthy")
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
||||
from services.cache import smart_cache
|
||||
from services.cache import cache_get, cache_set
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -17,6 +18,8 @@ ATTR_PENDING = 197
|
||||
ATTR_UNCORRECTABLE = 198
|
||||
ATTR_WEAR_LEVELING = 177 # SSD wear leveling
|
||||
|
||||
SMART_CACHE_TTL = int(os.environ.get("SMART_CACHE_TTL", "120"))
|
||||
|
||||
|
||||
def smartctl_available() -> bool:
|
||||
return shutil.which("smartctl") is not None
|
||||
@@ -26,19 +29,22 @@ def sg_ses_available() -> bool:
|
||||
return shutil.which("sg_ses") is not None
|
||||
|
||||
|
||||
async def get_smart_data(device: str) -> dict:
|
||||
"""Run smartctl -a -j against a device, with caching."""
|
||||
async def get_smart_data(device: str) -> tuple[dict, bool]:
|
||||
"""Run smartctl -a -j against a device, with caching.
|
||||
|
||||
Returns (data, cache_hit) tuple.
|
||||
"""
|
||||
# Sanitize device name: only allow alphanumeric and hyphens
|
||||
if not re.match(r"^[a-zA-Z0-9\-]+$", device):
|
||||
raise ValueError(f"Invalid device name: {device}")
|
||||
|
||||
cached = smart_cache.get(device)
|
||||
cached = await cache_get(f"jbod:smart:{device}")
|
||||
if cached is not None:
|
||||
return cached
|
||||
return (cached, True)
|
||||
|
||||
result = await _run_smartctl(device)
|
||||
smart_cache.set(device, result)
|
||||
return result
|
||||
await cache_set(f"jbod:smart:{device}", result, SMART_CACHE_TTL)
|
||||
return (result, False)
|
||||
|
||||
|
||||
async def _run_smartctl(device: str) -> dict:
|
||||
|
||||
@@ -4,11 +4,15 @@ import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from services.cache import cache_get, cache_set
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Allow overriding the zpool binary path via env (for bind-mounted host tools)
|
||||
ZPOOL_BIN = os.environ.get("ZPOOL_BIN", "zpool")
|
||||
|
||||
ZFS_CACHE_TTL = 300
|
||||
|
||||
|
||||
async def get_zfs_pool_map() -> dict[str, dict]:
|
||||
"""Return a dict mapping device names to ZFS pool and vdev info.
|
||||
@@ -16,6 +20,10 @@ async def get_zfs_pool_map() -> dict[str, dict]:
|
||||
e.g. {"sda": {"pool": "tank", "vdev": "raidz2-0"},
|
||||
"sdb": {"pool": "fast", "vdev": "mirror-0"}}
|
||||
"""
|
||||
cached = await cache_get("jbod:zfs_map")
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
pool_map = {}
|
||||
try:
|
||||
# When running in a container with pid:host, use nsenter to run
|
||||
@@ -94,6 +102,8 @@ async def get_zfs_pool_map() -> dict[str, dict]:
|
||||
pass
|
||||
except FileNotFoundError:
|
||||
logger.debug("zpool not available")
|
||||
|
||||
await cache_set("jbod:zfs_map", pool_map, ZFS_CACHE_TTL)
|
||||
return pool_map
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user