""" Polling loop — fetches TrueNAS state every POLL_INTERVAL_SECONDS and normalizes it into SQLite. Design notes: - Opens its own DB connection per cycle (WAL allows concurrent readers). - Skips a cycle if TrueNAS is unreachable; marks poller unhealthy. - Never overwrites a 'running' state with stale history. """ import asyncio import logging from datetime import datetime, timezone, timedelta from typing import Any import aiosqlite from app.config import settings from app.truenas import TrueNASClient log = logging.getLogger(__name__) # Shared state read by the /health endpoint _state: dict[str, Any] = { "last_poll_at": None, "last_error": None, "healthy": False, "drives_seen": 0, "consecutive_failures": 0, } # SSE subscriber queues — notified after each successful poll _subscribers: list[asyncio.Queue] = [] def get_state() -> dict: return _state.copy() def subscribe() -> asyncio.Queue: q: asyncio.Queue = asyncio.Queue(maxsize=1) _subscribers.append(q) return q def unsubscribe(q: asyncio.Queue) -> None: try: _subscribers.remove(q) except ValueError: pass def _notify_subscribers(alert: dict | None = None) -> None: payload = {"alert": alert} for q in list(_subscribers): try: q.put_nowait(payload) except asyncio.QueueFull: pass # Client is behind; skip this update def _now() -> str: return datetime.now(timezone.utc).isoformat() def _eta_from_progress(percent: int, started_iso: str | None) -> str | None: """Linear ETA extrapolation from elapsed time and percent complete.""" if not started_iso or percent <= 0: return None try: start = datetime.fromisoformat(started_iso) if start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) elapsed = (datetime.now(timezone.utc) - start).total_seconds() total_est = elapsed / (percent / 100) remaining = max(0.0, total_est - elapsed) return (datetime.now(timezone.utc) + timedelta(seconds=remaining)).isoformat() except Exception: return None def _map_history_state(status: str) -> str: return "passed" if "without error" in status.lower() else "failed" # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- async def _upsert_drive(db: aiosqlite.Connection, disk: dict, now: str) -> int: await db.execute( """ INSERT INTO drives (truenas_disk_id, devname, serial, model, size_bytes, temperature_c, smart_health, last_seen_at, last_polled_at) VALUES (?,?,?,?,?,?,?,?,?) ON CONFLICT(truenas_disk_id) DO UPDATE SET temperature_c = excluded.temperature_c, smart_health = excluded.smart_health, last_seen_at = excluded.last_seen_at, last_polled_at = excluded.last_polled_at """, ( disk["identifier"], disk["devname"], disk.get("serial"), disk.get("model"), disk.get("size"), disk.get("temperature"), disk.get("smart_health", "UNKNOWN"), now, now, ), ) cur = await db.execute( "SELECT id FROM drives WHERE truenas_disk_id = ?", (disk["identifier"],) ) row = await cur.fetchone() return row["id"] async def _upsert_test(db: aiosqlite.Connection, drive_id: int, ttype: str, data: dict) -> None: await db.execute( """ INSERT INTO smart_tests (drive_id, test_type, state, percent, truenas_job_id, started_at, eta_at, finished_at, error_text) VALUES (?,?,?,?,?,?,?,?,?) ON CONFLICT(drive_id, test_type) DO UPDATE SET state = excluded.state, percent = excluded.percent, truenas_job_id = excluded.truenas_job_id, started_at = COALESCE(excluded.started_at, smart_tests.started_at), eta_at = excluded.eta_at, finished_at = excluded.finished_at, error_text = excluded.error_text """, ( drive_id, ttype, data["state"], data.get("percent", 0), data.get("truenas_job_id"), data.get("started_at"), data.get("eta_at"), data.get("finished_at"), data.get("error_text"), ), ) async def _apply_running_job( db: aiosqlite.Connection, drive_id: int, ttype: str, job: dict ) -> None: pct = job["progress"]["percent"] await _upsert_test(db, drive_id, ttype, { "state": "running", "percent": pct, "truenas_job_id": job["id"], "started_at": job.get("time_started"), "eta_at": _eta_from_progress(pct, job.get("time_started")), "finished_at": None, "error_text": None, }) async def _sync_history( db: aiosqlite.Connection, client: TrueNASClient, drive_id: int, devname: str, ttype: str, ) -> None: """Pull most recent completed test from history. This is only called when the drive+type is NOT in the active running-jobs dict, so it's safe to overwrite any previous 'running' state — the job has finished (or was never started). """ try: results = await client.get_smart_results(devname) except Exception: return # History fetch failure is non-fatal if not results: return for test in results[0].get("tests", []): t_name = test.get("type", "").lower() is_short = "short" in t_name if (ttype == "short") != is_short: continue # Wrong test type state = _map_history_state(test.get("status", "")) await _upsert_test(db, drive_id, ttype, { "state": state, "percent": 100 if state == "passed" else 0, "truenas_job_id": None, "started_at": None, "eta_at": None, "finished_at": None, "error_text": test.get("status_verbose") if state == "failed" else None, }) break # Most recent only # --------------------------------------------------------------------------- # Poll cycle # --------------------------------------------------------------------------- async def poll_cycle(client: TrueNASClient) -> int: """Run one full poll. Returns number of drives seen.""" now = _now() disks = await client.get_disks() running_jobs = await client.get_smart_jobs(state="RUNNING") # Index running jobs by (devname, test_type) active: dict[tuple[str, str], dict] = {} for job in running_jobs: try: args = job["arguments"][0] devname = args["disks"][0] ttype = args["type"].lower() active[(devname, ttype)] = job except (KeyError, IndexError, TypeError): pass async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA foreign_keys=ON") for disk in disks: devname = disk["devname"] drive_id = await _upsert_drive(db, disk, now) for ttype in ("short", "long"): if (devname, ttype) in active: await _apply_running_job(db, drive_id, ttype, active[(devname, ttype)]) else: await _sync_history(db, client, drive_id, devname, ttype) await db.commit() return len(disks) # --------------------------------------------------------------------------- # Background loop # --------------------------------------------------------------------------- async def run(client: TrueNASClient) -> None: log.info("Poller started", extra={"poll_interval": settings.poll_interval_seconds}) cycle = 0 while True: try: count = await poll_cycle(client) cycle += 1 _state["last_poll_at"] = _now() _state["last_error"] = None _state["healthy"] = True _state["drives_seen"] = count _state["consecutive_failures"] = 0 log.debug("Poll OK", extra={"drives": count}) _notify_subscribers() # Check for stuck jobs every 5 cycles (~1 min at default 12s interval) if cycle % 5 == 0: try: from app import burnin as _burnin await _burnin.check_stuck_jobs() except Exception as exc: log.error("Stuck-job check failed: %s", exc) except Exception as exc: failures = _state["consecutive_failures"] + 1 _state["consecutive_failures"] = failures _state["last_error"] = str(exc) _state["healthy"] = False if failures >= 5: log.critical( "Poller has failed %d consecutive times: %s", failures, exc, extra={"consecutive_failures": failures}, ) else: log.error("Poll failed: %s", exc, extra={"consecutive_failures": failures}) await asyncio.sleep(settings.poll_interval_seconds)