nas-burnin/app/poller.py
Brandon Walter 5da1a1704f feat: pool-membership lock + cancellation hardening + smart_health refresh + tunables (1.0.0-13 -> 1.0.0-21)
Substantial feature + reliability sweep. Each version below was developed,
tested live against the maple/TrueNAS deployment, and Codex-reviewed
before bundling.

1.0.0-13 — asyncssh proc.kill() doesn't actually kill the remote process
  (sshd ignores SSH signal-channel requests by default), so a cancel of a
  long-running badblocks left the remote process running and proc.wait()
  hanging — pinning the asyncio.Semaphore slot forever.

  * Wrap long-lived commands in `sh -c 'echo PID:$$; exec <cmd>'` to
    capture the remote PID; store in burnin._remote_pids[job_id].
  * burnin._kill_remote_process(job_id) opens a fresh SSH session and
    issues `kill -9 <pid>` — sshd honours that.
  * Bound proc.wait() with asyncio.wait_for(timeout=15).
  * burnin._active_tasks tracks every _run_job task so cancel_job and
    check_stuck_jobs can actually cancel the asyncio task (was DB-only
    before). Also fixes the documented asyncio.create_task GC gotcha
    (weak refs only).
  * _run_job finalizer reads current state and skips the write if state
    != 'running' so cancelled/unknown aren't clobbered.

1.0.0-14 — poller._upsert_drive ON CONFLICT only refreshed temperature/
  health/poll timestamps; devname/serial/model/size_bytes were stuck at
  first-INSERT values forever. After kernel SCSI re-enumeration two
  drives could both show as `sda`. Fixed by updating all six fields.
  Also added 7-day stale filter to _DRIVES_QUERY so removed drives drop
  off the dashboard while audit/burnin_jobs FKs stay intact.

1.0.0-15/-16 — pool-membership lock.
  * ssh_client.get_pool_membership() runs `zpool list -vHP` and parses
    the flattened TrueNAS output (container vdevs + their device children
    both appear at depth 1; section markers cache/log/spare/special/dedup
    switch the role).
  * ssh_client.get_zfs_member_drives() runs `lsblk -no NAME,FSTYPE -l`
    to detect drives carrying ZFS labels not in any active pool — they
    get pool_name='(exported)', pool_role='exported'.
  * Three idempotent ALTER TABLE migrations on drives:
    pool_name/pool_role/pool_seen_at.
  * burnin.start_job raises PoolMemberError if pool_name IS NOT NULL and
    the drive isn't in burnin._unlock_grants. Routes layer maps to 409
    with structured detail {pool_name, pool_role, pool_locked: true} so
    the frontend can render an unlock affordance.
  * POST /api/v1/drives/{id}/unlock accepts {confirm_token, operator,
    reason}. Token is the pool name for active pools, "DESTROY BOOT POOL"
    for boot-pool, "DESTROY EXPORTED POOL" for exported. Reason >= 5
    chars. TTL = UNLOCK_TTL_SECONDS = 600. Audit event types:
    pool_drive_unlocked / boot_pool_drive_unlocked /
    exported_pool_drive_unlocked.
  * Grants are in-memory only — container restart wipes them.
  * UI: lock icon (yellow/red/orange), pool pill, conditional Unlock vs
    Burn-In button. modal_unlock.html with type-to-confirm field.
    Live unlock countdown via tickUnlockCountdowns() in app.js.
  * Daily report: red banner listing every unlock event from the last
    24h, with operator + reason + timestamp.

1.0.0-17 — Codex review fail-open + XSS + structured-error fixes.
  * ssh_client.get_pool_membership / get_zfs_member_drives now return
    None on failure (vs {} for 'definitely empty'). poller passes
    update_pool=False to _upsert_drive on detection failure, preserving
    existing pool columns instead of clearing them. Without this fix a
    1-second SSH blip silently unlocked every drive.
  * mailer._build_unlock_banner_html escapes every interpolated field
    via html.escape() (was '<' only). Time filter switched to
    julianday() — string >= against datetime('now', '-1 day') compared
    formats with different separators ('T' vs ' ') and timezone
    suffixes, causing subtle off-by-N-hour inclusion.
  * app.js submitStart/submitBatchStart now detect the structured
    pool_locked 409 detail and auto-open the unlock modal for the
    offending drive (was [object Object] in toast).

1.0.0-18 — Codex grant-binding + commit-ordering fixes.
  * Unlock grants bound to the (pool_name, pool_role) observed at unlock
    time. _UnlockGrant dataclass; _is_unlocked and unlock_expiry
    invalidate the grant if the live row's pool identity has changed.
    Prevents an 'exported' unlock from carrying over when the drive
    turns out to be in active 'tank' or 'boot-pool'.
  * grant_pool_unlock now writes to _unlock_grants only AFTER db.commit()
    succeeds — previously a failed audit insert left an unaudited grant
    armed.

1.0.0-19 — Codex race + cancellation classification + test scaffold.
  * Partial unique index uniq_active_burnin_per_drive ON burnin_jobs
    (drive_id) WHERE state IN ('queued','running'). INSERT now wraps in
    try/except aiosqlite.IntegrityError -> ValueError so the read-then-
    insert race in start_job can't produce two queued rows for the same
    drive.
  * _run_job tracks was_cancelled flag; on bare task.cancel() (shutdown,
    future code paths) where DB state is still 'running', finalizer
    writes 'unknown' instead of mis-classifying as 'failed'.
  * tests/ stdlib unittest scaffold:
    - test_pool_parser.py (21 tests): mirror/raidz/draid container vdevs,
      single-disk depth-1, plural section markers, partition stripping,
      sdaa-style names, multi-pool, role reset between pools.
    - test_unlock_flow.py (18 tests): token validation per pool kind,
      identity-binding invalidation, TTL expiry, audit-commit-then-arm
      ordering, unique-active-burnin partial index.
    Run via `python -m unittest discover tests/`. No new dependencies.

1.0.0-20 — Spearfoot-inspired badblocks tunables.
  * surface_validate_block_size (-b, default 4096), surface_validate_
    block_buffer (-c, default 64), surface_validate_passes (-p, default
    1) exposed in Settings UI; persist via settings_store.json.
    Validation: block size must be a power of 2 between 512 and
    1048576. Defaults preserve existing behaviour. Bumping to 8192/64/1
    roughly halves runtime on multi-TB HDDs at ~2x RAM cost.

1.0.0-21 — SMART overall-health column actually populated.
  * /api/v2.0/disk doesn't expose smart_health, so every drive defaulted
    to UNKNOWN forever (only burn-in stages ever wrote a real value).
  * ssh_client.get_smart_health_map([devnames]) runs `smartctl -H` for
    all drives in a single SSH session, deterministically delimited with
    @@devname@@ ... @@END@@ markers. Returns {devname: PASSED|FAILED|
    UNKNOWN} or None on SSH failure.
  * poller calls it every 5th cycle (~1 min at default 12s interval),
    caches in _state['smart_health_cache'] so transient failures preserve
    the previous values.
  * Dashboard CSS: col-smart min-width 150 -> 95, horizontal padding 14
    -> 6 so Short/Long SMART columns fit comfortably on a 13-inch
    display.
  * 5 additional parser tests (44 total, all passing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 09:25:56 -04:00

520 lines
19 KiB
Python

"""
Polling loop — fetches TrueNAS state every POLL_INTERVAL_SECONDS and
normalizes it into SQLite.
Design notes:
- Opens its own DB connection per cycle (WAL allows concurrent readers).
- Skips a cycle if TrueNAS is unreachable; marks poller unhealthy.
- Never overwrites a 'running' state with stale history.
"""
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Any
import aiosqlite
from app.config import settings
from app.truenas import TrueNASClient
log = logging.getLogger(__name__)
# Shared state read by the /health endpoint and dashboard template
_state: dict[str, Any] = {
"last_poll_at": None,
"last_error": None,
"healthy": False,
"drives_seen": 0,
"consecutive_failures": 0,
"system_temps": {}, # {"cpu_c": int|None, "pch_c": int|None}
"thermal_pressure": "ok", # "ok" | "warn" | "crit" — based on running burn-in drive temps
}
# SSE subscriber queues — notified after each successful poll
_subscribers: list[asyncio.Queue] = []
def get_state() -> dict:
return _state.copy()
def subscribe() -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=1)
_subscribers.append(q)
return q
def unsubscribe(q: asyncio.Queue) -> None:
try:
_subscribers.remove(q)
except ValueError:
pass
def _notify_subscribers(alert: dict | None = None) -> None:
payload = {"alert": alert}
for q in list(_subscribers):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
pass # Client is behind; skip this update
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _eta_from_progress(percent: int, started_iso: str | None) -> str | None:
"""Linear ETA extrapolation from elapsed time and percent complete."""
if not started_iso or percent <= 0:
return None
try:
start = datetime.fromisoformat(started_iso)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
total_est = elapsed / (percent / 100)
remaining = max(0.0, total_est - elapsed)
return (datetime.now(timezone.utc) + timedelta(seconds=remaining)).isoformat()
except Exception:
return None
def _map_history_state(status: str) -> str:
return "passed" if "without error" in status.lower() else "failed"
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
async def _upsert_drive(db: aiosqlite.Connection, disk: dict, now: str,
pool_info: dict | None = None,
update_pool: bool = True) -> int:
"""Insert/update a drive row.
pool_info: {"pool": str, "role": str} if this drive is currently in a
zpool, else None. None values clear pool columns so a removed-from-pool
drive doesn't stay locked.
update_pool: when False, pool columns are preserved on conflict and
initialised to NULL on insert. Callers pass False on detection failure
so a transient SSH outage doesn't silently unlock every drive.
"""
pool_name = pool_info["pool"] if pool_info else None
pool_role = pool_info["role"] if pool_info else None
pool_seen_at = now if pool_info else None
if update_pool:
update_clause = """
devname = excluded.devname,
serial = excluded.serial,
model = excluded.model,
size_bytes = excluded.size_bytes,
temperature_c = excluded.temperature_c,
smart_health = excluded.smart_health,
last_seen_at = excluded.last_seen_at,
last_polled_at = excluded.last_polled_at,
pool_name = excluded.pool_name,
pool_role = excluded.pool_role,
pool_seen_at = excluded.pool_seen_at
"""
else:
# Preserve pool_name / pool_role / pool_seen_at — detection failed
# this cycle, so we have no fresh data and must not overwrite.
update_clause = """
devname = excluded.devname,
serial = excluded.serial,
model = excluded.model,
size_bytes = excluded.size_bytes,
temperature_c = excluded.temperature_c,
smart_health = excluded.smart_health,
last_seen_at = excluded.last_seen_at,
last_polled_at = excluded.last_polled_at
"""
await db.execute(
f"""
INSERT INTO drives
(truenas_disk_id, devname, serial, model, size_bytes,
temperature_c, smart_health, last_seen_at, last_polled_at,
pool_name, pool_role, pool_seen_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(truenas_disk_id) DO UPDATE SET
{update_clause}
""",
(
disk["identifier"],
disk["devname"],
disk.get("serial"),
disk.get("model"),
disk.get("size"),
disk.get("temperature"),
disk.get("smart_health", "UNKNOWN"),
now,
now,
pool_name,
pool_role,
pool_seen_at,
),
)
cur = await db.execute(
"SELECT id FROM drives WHERE truenas_disk_id = ?", (disk["identifier"],)
)
row = await cur.fetchone()
return row["id"]
async def _upsert_test(db: aiosqlite.Connection, drive_id: int, ttype: str, data: dict) -> None:
await db.execute(
"""
INSERT INTO smart_tests
(drive_id, test_type, state, percent, truenas_job_id,
started_at, eta_at, finished_at, error_text)
VALUES (?,?,?,?,?,?,?,?,?)
ON CONFLICT(drive_id, test_type) DO UPDATE SET
state = excluded.state,
percent = excluded.percent,
truenas_job_id = excluded.truenas_job_id,
started_at = COALESCE(excluded.started_at, smart_tests.started_at),
eta_at = excluded.eta_at,
finished_at = excluded.finished_at,
error_text = excluded.error_text
""",
(
drive_id,
ttype,
data["state"],
data.get("percent", 0),
data.get("truenas_job_id"),
data.get("started_at"),
data.get("eta_at"),
data.get("finished_at"),
data.get("error_text"),
),
)
async def _apply_running_job(
db: aiosqlite.Connection, drive_id: int, ttype: str, job: dict
) -> None:
pct = job["progress"]["percent"]
await _upsert_test(db, drive_id, ttype, {
"state": "running",
"percent": pct,
"truenas_job_id": job["id"],
"started_at": job.get("time_started"),
"eta_at": _eta_from_progress(pct, job.get("time_started")),
"finished_at": None,
"error_text": None,
})
async def _sync_history(
db: aiosqlite.Connection,
client: TrueNASClient,
drive_id: int,
devname: str,
ttype: str,
) -> None:
"""Pull most recent completed test from history.
This is only called when the drive+type is NOT in the active running-jobs
dict, so it's safe to overwrite any previous 'running' state — the job
has finished (or was never started).
"""
try:
results = await client.get_smart_results(devname)
except Exception:
return # History fetch failure is non-fatal
if not results:
return
for test in results[0].get("tests", []):
t_name = test.get("type", "").lower()
is_short = "short" in t_name
if (ttype == "short") != is_short:
continue # Wrong test type
state = _map_history_state(test.get("status", ""))
await _upsert_test(db, drive_id, ttype, {
"state": state,
"percent": 100 if state == "passed" else 0,
"truenas_job_id": None,
"started_at": None,
"eta_at": None,
"finished_at": None,
"error_text": test.get("status_verbose") if state == "failed" else None,
})
break # Most recent only
# ---------------------------------------------------------------------------
# Poll cycle
# ---------------------------------------------------------------------------
async def _poll_smart_via_ssh(db: aiosqlite.Connection, now: str) -> None:
"""
Poll progress for SMART tests started via SSH (truenas_job_id IS NULL).
Used on TrueNAS SCALE 25.10+ where the REST smart/test API no longer exists.
"""
from app import ssh_client
if not ssh_client.is_configured():
return
cur = await db.execute(
"""SELECT st.id, st.test_type, st.drive_id, d.devname, st.started_at
FROM smart_tests st
JOIN drives d ON d.id = st.drive_id
WHERE st.state = 'running' AND st.truenas_job_id IS NULL"""
)
rows = await cur.fetchall()
if not rows:
return
for row in rows:
test_id, ttype, drive_id, devname, started_at = row[0], row[1], row[2], row[3], row[4]
try:
progress = await ssh_client.poll_smart_progress(devname)
except Exception as exc:
log.warning("SSH SMART poll failed for %s: %s", devname, exc)
continue
state = progress["state"]
pct_remaining = progress.get("percent_remaining") # None = not yet in output
raw_output = progress.get("output", "")
if state == "running":
# pct_remaining=None means smartctl output doesn't have the % line yet
# (test just started) — keep percent at 0 rather than jumping to 100
if pct_remaining is None:
pct = 0
else:
pct = max(0, 100 - pct_remaining)
eta = _eta_from_progress(pct, started_at)
await db.execute(
"UPDATE smart_tests SET percent=?, eta_at=?, raw_output=? WHERE id=?",
(pct, eta, raw_output, test_id),
)
elif state == "passed":
await db.execute(
"UPDATE smart_tests SET state='passed', percent=100, finished_at=?, raw_output=? WHERE id=?",
(now, raw_output, test_id),
)
log.info("SSH SMART %s passed on %s", ttype, devname)
elif state == "failed":
await db.execute(
"UPDATE smart_tests SET state='failed', percent=0, finished_at=?, "
"error_text=?, raw_output=? WHERE id=?",
(now, f"SMART {ttype.upper()} test failed", raw_output, test_id),
)
log.warning("SSH SMART %s FAILED on %s", ttype, devname)
# state == "unknown" → keep polling, no update
await db.commit()
async def poll_cycle(client: TrueNASClient) -> int:
"""Run one full poll. Returns number of drives seen."""
now = _now()
disks = await client.get_disks()
running_jobs = await client.get_smart_jobs(state="RUNNING")
# Fetch temperatures via SCALE-specific endpoint.
# CORE doesn't have this endpoint — silently skip on any error.
try:
temps = await client.get_disk_temperatures()
except Exception:
temps = {}
# Inject temperature into each disk dict (SCALE 25.10 has no temp in /disk)
for disk in disks:
devname = disk.get("devname", "")
t = temps.get(devname)
if t is not None:
disk["temperature"] = int(round(t))
# SMART health — TrueNAS /api/v2.0/disk doesn't expose smart_health,
# so without this every drive defaults to UNKNOWN forever (only burn-in
# stages used to populate it). Run `smartctl -H` over a single SSH
# session for every drive every Nth cycle. Cache between cycles via
# _state so the dashboard always renders the most recent answer.
SMART_HEALTH_EVERY_N_CYCLES = 5 # ~1 minute at default 12s interval
_state.setdefault("smart_health_cache", {})
cycle_n = _state.get("cycle", 0) + 1
_state["cycle"] = cycle_n
try:
from app import ssh_client as _ssh
if _ssh.is_configured() and (cycle_n % SMART_HEALTH_EVERY_N_CYCLES == 1):
health_map = await _ssh.get_smart_health_map(
[d["devname"] for d in disks if d.get("devname")]
)
if health_map is not None:
_state["smart_health_cache"] = health_map
except Exception as exc:
log.warning("smart_health refresh failed: %s", exc)
health_cache = _state.get("smart_health_cache") or {}
for disk in disks:
devname = disk.get("devname", "")
h = health_cache.get(devname)
if h:
disk["smart_health"] = h
# Pool membership map — drives in any zpool are locked from burn-in.
# ssh_client returns None on failure (distinct from {} which means "no
# pools"). If EITHER detection call fails we fail-closed: leave
# pool_name / pool_role columns alone so previously-locked drives stay
# locked, and previously-unlocked drives stay unlocked, until detection
# recovers. Treating a transient SSH blip as "no pool members" would
# silently unlock every drive on the next poll.
detection_ok = True
pool_map: dict = {}
zfs_member_set: set = set()
try:
from app import ssh_client as _ssh
if _ssh.is_configured():
pm = await _ssh.get_pool_membership()
zs = await _ssh.get_zfs_member_drives()
if pm is None or zs is None:
detection_ok = False
else:
pool_map = pm
zfs_member_set = zs
# SSH unconfigured (mock/dev mode) — detection_ok stays True with
# empty maps, so dev mode never artificially locks drives.
except Exception:
detection_ok = False
if not detection_ok:
log.warning(
"Pool detection failed this cycle — preserving existing "
"pool_name/pool_role columns. Locked drives stay locked, "
"unlocked drives stay unlocked, until SSH recovers."
)
if detection_ok:
# Drives carrying ZFS labels but not in any active pool are
# "exported" — same hazard as an active pool member, so lock them
# too. We don't know the original pool name without
# `zpool import`-style scanning (slow + blocks); display
# "(exported)" and use a special token.
for devname in zfs_member_set:
if devname not in pool_map:
pool_map[devname] = {"pool": "(exported)", "role": "exported"}
# Index running jobs by (devname, test_type)
active: dict[tuple[str, str], dict] = {}
for job in running_jobs:
try:
args = job["arguments"][0]
devname = args["disks"][0]
ttype = args["type"].lower()
active[(devname, ttype)] = job
except (KeyError, IndexError, TypeError):
pass
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
for disk in disks:
devname = disk["devname"]
drive_id = await _upsert_drive(
db, disk, now,
pool_map.get(devname) if detection_ok else None,
update_pool=detection_ok,
)
for ttype in ("short", "long"):
if (devname, ttype) in active:
await _apply_running_job(db, drive_id, ttype, active[(devname, ttype)])
else:
await _sync_history(db, client, drive_id, devname, ttype)
await db.commit()
# SSH SMART polling — for tests started via smartctl (no TrueNAS REST job)
await _poll_smart_via_ssh(db, now)
return len(disks)
# ---------------------------------------------------------------------------
# Background loop
# ---------------------------------------------------------------------------
async def run(client: TrueNASClient) -> None:
log.info("Poller started", extra={"poll_interval": settings.poll_interval_seconds})
cycle = 0
while True:
try:
count = await poll_cycle(client)
cycle += 1
_state["last_poll_at"] = _now()
_state["last_error"] = None
_state["healthy"] = True
_state["drives_seen"] = count
_state["consecutive_failures"] = 0
log.debug("Poll OK", extra={"drives": count})
# System sensor temps via SSH (non-fatal)
from app import ssh_client as _ssh
if _ssh.is_configured():
try:
_state["system_temps"] = await _ssh.get_system_sensors()
except Exception:
pass
# Thermal pressure: max temp of drives currently under burn-in
try:
async with aiosqlite.connect(settings.db_path) as _tdb:
_tdb.row_factory = aiosqlite.Row
await _tdb.execute("PRAGMA journal_mode=WAL")
_cur = await _tdb.execute("""
SELECT MAX(d.temperature_c)
FROM drives d
JOIN burnin_jobs bj ON bj.drive_id = d.id
WHERE bj.state = 'running' AND d.temperature_c IS NOT NULL
""")
_row = await _cur.fetchone()
_max_t = _row[0] if _row and _row[0] is not None else None
if _max_t is None:
_state["thermal_pressure"] = "ok"
elif _max_t >= settings.temp_crit_c:
_state["thermal_pressure"] = "crit"
elif _max_t >= settings.temp_warn_c:
_state["thermal_pressure"] = "warn"
else:
_state["thermal_pressure"] = "ok"
except Exception:
_state["thermal_pressure"] = "ok"
_notify_subscribers()
# Check for stuck jobs every 5 cycles (~1 min at default 12s interval)
if cycle % 5 == 0:
try:
from app import burnin as _burnin
await _burnin.check_stuck_jobs()
except Exception as exc:
log.error("Stuck-job check failed: %s", exc)
except Exception as exc:
failures = _state["consecutive_failures"] + 1
_state["consecutive_failures"] = failures
_state["last_error"] = str(exc)
_state["healthy"] = False
if failures >= 5:
log.critical(
"Poller has failed %d consecutive times: %s",
failures, exc,
extra={"consecutive_failures": failures},
)
else:
log.error("Poll failed: %s", exc, extra={"consecutive_failures": failures})
await asyncio.sleep(settings.poll_interval_seconds)