nas-burnin/app/poller.py
Brandon Walter d4c0770b9e feat: app-level login + hardening sweep (1.0.0-22 -> 1.0.0-23)
Two layered changes shipped in this branch:

== 1.0.0-22: app-level authentication ==

The dashboard previously had only an IP allowlist. Adds username +
bcrypt password auth, signed-cookie sessions, and a "first user setup"
flow.

* New app/auth.py: User dataclass, bcrypt hash/verify, get_user_by_id/
  username, create_user, touch_last_login, FastAPI `get_current_user`
  dependency. Session secret loaded from SESSION_SECRET env or persisted
  to /data/session_secret.
* New app/auth_cli.py: `python -m app.auth_cli list|reset|add` for
  out-of-band user management. Passwords always read from a TTY prompt.
* Schema: idempotent ALTER for `users` table (id, username unique,
  password_hash, full_name, is_admin, created_at, last_login_at).
* main.py: SessionMiddleware (HMAC-signed cookie, max-age 7 days,
  SameSite=strict — see hardening section) + _AuthGateMiddleware that
  populates request.state.current_user and bounces unauth'd HTML GETs
  to /login while returning 401 JSON for everything else.
* Routes: GET /login renders first-user-setup form when users table is
  empty otherwise sign-in form; POST /login; POST /api/v1/auth/setup
  (only works while empty); GET|POST /logout.
* Bootstrap: env vars INITIAL_ADMIN_USERNAME + INITIAL_ADMIN_PASSWORD
  create the first admin on startup if both set AND users table empty.
  Ignored thereafter — change passwords via UI or CLI.
* Layout: header shows current_user.full_name|username + Logout link.
  Modal operator field auto-fills from the logged-in user via
  <meta name="default-operator"> rendered in layout (replaces the
  localStorage-only previous behaviour).
* requirements.txt: pinned bcrypt>=4.0,<5.0, itsdangerous>=2.1,
  python-multipart>=0.0.7. First step toward addressing the
  unpinned-deps gotcha.
* New app/templates/login.html with first-user-setup variant.

== 1.0.0-23: hardening sweep ==

Closes the eight-item gap audit:

* DB retention + automated backup. New app/retention.py runs daily at
  03:00 local. Nulls burnin_stages.log_text on stages older than
  retention_log_days (default 35), VACUUMs to reclaim pages, then runs
  `sqlite3 .backup` to /data/backups/app-YYYY-MM-DD.db keeping the
  retention_backup_keep most recent (default 14). Wired into the
  lifespan supervisor next to mailer/poller.

* CSRF mitigation. SessionMiddleware bumped to SameSite=strict so the
  browser refuses to send the session cookie on cross-site POSTs —
  removes the actual CSRF vector. Trade-off: external links into the
  app require re-auth.

* Login rate limiting. In-memory per-username AND per-source-IP failure
  counters in auth.py. 10 failures within 10 min trips a 15-min lockout
  for both keys. Returns HTTP 429 with a clear "try again in N min"
  message. Cleared on successful login.

* Login audit events. New event types in audit_events: user_login,
  user_login_failed, user_login_locked_out, user_logout,
  user_password_changed. All include source IP. Recorded via
  auth.audit_auth_event().

* Password change UI. Header link "Change password" opens
  templates/components/modal_password.html (current/new/confirm).
  Posts to POST /api/v1/auth/change-password — bcrypt-verifies current,
  requires >=8 char new pw, writes audit event.

* NVMe burn-in path. _stage_surface_validate now detects nvme*
  devnames and routes to _stage_surface_validate_nvme() which runs
  `nvme format -s 1 --force` (cryptographic erase). Seconds vs hours
  of badblocks, exercises the controller's secure-erase. Falls back
  to badblocks if nvme-cli isn't installed. Post-format SMART check.

* Mounted-FS detection. ssh_client.get_mounted_drives() runs
  `findmnt -no SOURCE`, parses non-ZFS sources back to base devnames.
  Poller treats them as pool_name='(mounted)', pool_role='mounted'.
  Confirm token DESTROY MOUNTED FILESYSTEM, distinct purple styling,
  audit event mounted_drive_unlocked, daily-report banner picks it up.

* Deeper /health. Real readiness check — DB write probe (PRAGMA
  journal_mode), poller freshness (age <= 3x stale_threshold), SSH
  test_connection() when configured. Returns 503 when any check fails
  so a proxy/orchestrator can take the container out of rotation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 11:08:29 -04:00

531 lines
19 KiB
Python

"""
Polling loop — fetches TrueNAS state every POLL_INTERVAL_SECONDS and
normalizes it into SQLite.
Design notes:
- Opens its own DB connection per cycle (WAL allows concurrent readers).
- Skips a cycle if TrueNAS is unreachable; marks poller unhealthy.
- Never overwrites a 'running' state with stale history.
"""
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Any
import aiosqlite
from app.config import settings
from app.truenas import TrueNASClient
log = logging.getLogger(__name__)
# Shared state read by the /health endpoint and dashboard template
_state: dict[str, Any] = {
"last_poll_at": None,
"last_error": None,
"healthy": False,
"drives_seen": 0,
"consecutive_failures": 0,
"system_temps": {}, # {"cpu_c": int|None, "pch_c": int|None}
"thermal_pressure": "ok", # "ok" | "warn" | "crit" — based on running burn-in drive temps
}
# SSE subscriber queues — notified after each successful poll
_subscribers: list[asyncio.Queue] = []
def get_state() -> dict:
return _state.copy()
def subscribe() -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=1)
_subscribers.append(q)
return q
def unsubscribe(q: asyncio.Queue) -> None:
try:
_subscribers.remove(q)
except ValueError:
pass
def _notify_subscribers(alert: dict | None = None) -> None:
payload = {"alert": alert}
for q in list(_subscribers):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
pass # Client is behind; skip this update
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _eta_from_progress(percent: int, started_iso: str | None) -> str | None:
"""Linear ETA extrapolation from elapsed time and percent complete."""
if not started_iso or percent <= 0:
return None
try:
start = datetime.fromisoformat(started_iso)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
total_est = elapsed / (percent / 100)
remaining = max(0.0, total_est - elapsed)
return (datetime.now(timezone.utc) + timedelta(seconds=remaining)).isoformat()
except Exception:
return None
def _map_history_state(status: str) -> str:
return "passed" if "without error" in status.lower() else "failed"
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
async def _upsert_drive(db: aiosqlite.Connection, disk: dict, now: str,
pool_info: dict | None = None,
update_pool: bool = True) -> int:
"""Insert/update a drive row.
pool_info: {"pool": str, "role": str} if this drive is currently in a
zpool, else None. None values clear pool columns so a removed-from-pool
drive doesn't stay locked.
update_pool: when False, pool columns are preserved on conflict and
initialised to NULL on insert. Callers pass False on detection failure
so a transient SSH outage doesn't silently unlock every drive.
"""
pool_name = pool_info["pool"] if pool_info else None
pool_role = pool_info["role"] if pool_info else None
pool_seen_at = now if pool_info else None
if update_pool:
update_clause = """
devname = excluded.devname,
serial = excluded.serial,
model = excluded.model,
size_bytes = excluded.size_bytes,
temperature_c = excluded.temperature_c,
smart_health = excluded.smart_health,
last_seen_at = excluded.last_seen_at,
last_polled_at = excluded.last_polled_at,
pool_name = excluded.pool_name,
pool_role = excluded.pool_role,
pool_seen_at = excluded.pool_seen_at
"""
else:
# Preserve pool_name / pool_role / pool_seen_at — detection failed
# this cycle, so we have no fresh data and must not overwrite.
update_clause = """
devname = excluded.devname,
serial = excluded.serial,
model = excluded.model,
size_bytes = excluded.size_bytes,
temperature_c = excluded.temperature_c,
smart_health = excluded.smart_health,
last_seen_at = excluded.last_seen_at,
last_polled_at = excluded.last_polled_at
"""
await db.execute(
f"""
INSERT INTO drives
(truenas_disk_id, devname, serial, model, size_bytes,
temperature_c, smart_health, last_seen_at, last_polled_at,
pool_name, pool_role, pool_seen_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(truenas_disk_id) DO UPDATE SET
{update_clause}
""",
(
disk["identifier"],
disk["devname"],
disk.get("serial"),
disk.get("model"),
disk.get("size"),
disk.get("temperature"),
disk.get("smart_health", "UNKNOWN"),
now,
now,
pool_name,
pool_role,
pool_seen_at,
),
)
cur = await db.execute(
"SELECT id FROM drives WHERE truenas_disk_id = ?", (disk["identifier"],)
)
row = await cur.fetchone()
return row["id"]
async def _upsert_test(db: aiosqlite.Connection, drive_id: int, ttype: str, data: dict) -> None:
await db.execute(
"""
INSERT INTO smart_tests
(drive_id, test_type, state, percent, truenas_job_id,
started_at, eta_at, finished_at, error_text)
VALUES (?,?,?,?,?,?,?,?,?)
ON CONFLICT(drive_id, test_type) DO UPDATE SET
state = excluded.state,
percent = excluded.percent,
truenas_job_id = excluded.truenas_job_id,
started_at = COALESCE(excluded.started_at, smart_tests.started_at),
eta_at = excluded.eta_at,
finished_at = excluded.finished_at,
error_text = excluded.error_text
""",
(
drive_id,
ttype,
data["state"],
data.get("percent", 0),
data.get("truenas_job_id"),
data.get("started_at"),
data.get("eta_at"),
data.get("finished_at"),
data.get("error_text"),
),
)
async def _apply_running_job(
db: aiosqlite.Connection, drive_id: int, ttype: str, job: dict
) -> None:
pct = job["progress"]["percent"]
await _upsert_test(db, drive_id, ttype, {
"state": "running",
"percent": pct,
"truenas_job_id": job["id"],
"started_at": job.get("time_started"),
"eta_at": _eta_from_progress(pct, job.get("time_started")),
"finished_at": None,
"error_text": None,
})
async def _sync_history(
db: aiosqlite.Connection,
client: TrueNASClient,
drive_id: int,
devname: str,
ttype: str,
) -> None:
"""Pull most recent completed test from history.
This is only called when the drive+type is NOT in the active running-jobs
dict, so it's safe to overwrite any previous 'running' state — the job
has finished (or was never started).
"""
try:
results = await client.get_smart_results(devname)
except Exception:
return # History fetch failure is non-fatal
if not results:
return
for test in results[0].get("tests", []):
t_name = test.get("type", "").lower()
is_short = "short" in t_name
if (ttype == "short") != is_short:
continue # Wrong test type
state = _map_history_state(test.get("status", ""))
await _upsert_test(db, drive_id, ttype, {
"state": state,
"percent": 100 if state == "passed" else 0,
"truenas_job_id": None,
"started_at": None,
"eta_at": None,
"finished_at": None,
"error_text": test.get("status_verbose") if state == "failed" else None,
})
break # Most recent only
# ---------------------------------------------------------------------------
# Poll cycle
# ---------------------------------------------------------------------------
async def _poll_smart_via_ssh(db: aiosqlite.Connection, now: str) -> None:
"""
Poll progress for SMART tests started via SSH (truenas_job_id IS NULL).
Used on TrueNAS SCALE 25.10+ where the REST smart/test API no longer exists.
"""
from app import ssh_client
if not ssh_client.is_configured():
return
cur = await db.execute(
"""SELECT st.id, st.test_type, st.drive_id, d.devname, st.started_at
FROM smart_tests st
JOIN drives d ON d.id = st.drive_id
WHERE st.state = 'running' AND st.truenas_job_id IS NULL"""
)
rows = await cur.fetchall()
if not rows:
return
for row in rows:
test_id, ttype, drive_id, devname, started_at = row[0], row[1], row[2], row[3], row[4]
try:
progress = await ssh_client.poll_smart_progress(devname)
except Exception as exc:
log.warning("SSH SMART poll failed for %s: %s", devname, exc)
continue
state = progress["state"]
pct_remaining = progress.get("percent_remaining") # None = not yet in output
raw_output = progress.get("output", "")
if state == "running":
# pct_remaining=None means smartctl output doesn't have the % line yet
# (test just started) — keep percent at 0 rather than jumping to 100
if pct_remaining is None:
pct = 0
else:
pct = max(0, 100 - pct_remaining)
eta = _eta_from_progress(pct, started_at)
await db.execute(
"UPDATE smart_tests SET percent=?, eta_at=?, raw_output=? WHERE id=?",
(pct, eta, raw_output, test_id),
)
elif state == "passed":
await db.execute(
"UPDATE smart_tests SET state='passed', percent=100, finished_at=?, raw_output=? WHERE id=?",
(now, raw_output, test_id),
)
log.info("SSH SMART %s passed on %s", ttype, devname)
elif state == "failed":
await db.execute(
"UPDATE smart_tests SET state='failed', percent=0, finished_at=?, "
"error_text=?, raw_output=? WHERE id=?",
(now, f"SMART {ttype.upper()} test failed", raw_output, test_id),
)
log.warning("SSH SMART %s FAILED on %s", ttype, devname)
# state == "unknown" → keep polling, no update
await db.commit()
async def poll_cycle(client: TrueNASClient) -> int:
"""Run one full poll. Returns number of drives seen."""
now = _now()
disks = await client.get_disks()
running_jobs = await client.get_smart_jobs(state="RUNNING")
# Fetch temperatures via SCALE-specific endpoint.
# CORE doesn't have this endpoint — silently skip on any error.
try:
temps = await client.get_disk_temperatures()
except Exception:
temps = {}
# Inject temperature into each disk dict (SCALE 25.10 has no temp in /disk)
for disk in disks:
devname = disk.get("devname", "")
t = temps.get(devname)
if t is not None:
disk["temperature"] = int(round(t))
# SMART health — TrueNAS /api/v2.0/disk doesn't expose smart_health,
# so without this every drive defaults to UNKNOWN forever (only burn-in
# stages used to populate it). Run `smartctl -H` over a single SSH
# session for every drive every Nth cycle. Cache between cycles via
# _state so the dashboard always renders the most recent answer.
SMART_HEALTH_EVERY_N_CYCLES = 5 # ~1 minute at default 12s interval
_state.setdefault("smart_health_cache", {})
cycle_n = _state.get("cycle", 0) + 1
_state["cycle"] = cycle_n
try:
from app import ssh_client as _ssh
if _ssh.is_configured() and (cycle_n % SMART_HEALTH_EVERY_N_CYCLES == 1):
health_map = await _ssh.get_smart_health_map(
[d["devname"] for d in disks if d.get("devname")]
)
if health_map is not None:
_state["smart_health_cache"] = health_map
except Exception as exc:
log.warning("smart_health refresh failed: %s", exc)
health_cache = _state.get("smart_health_cache") or {}
for disk in disks:
devname = disk.get("devname", "")
h = health_cache.get(devname)
if h:
disk["smart_health"] = h
# Pool membership map — drives in any zpool are locked from burn-in.
# ssh_client returns None on failure (distinct from {} which means "no
# pools"). If EITHER detection call fails we fail-closed: leave
# pool_name / pool_role columns alone so previously-locked drives stay
# locked, and previously-unlocked drives stay unlocked, until detection
# recovers. Treating a transient SSH blip as "no pool members" would
# silently unlock every drive on the next poll.
detection_ok = True
pool_map: dict = {}
zfs_member_set: set = set()
mounted_set: set = set()
try:
from app import ssh_client as _ssh
if _ssh.is_configured():
pm = await _ssh.get_pool_membership()
zs = await _ssh.get_zfs_member_drives()
ms = await _ssh.get_mounted_drives()
if pm is None or zs is None or ms is None:
detection_ok = False
else:
pool_map = pm
zfs_member_set = zs
mounted_set = ms
# SSH unconfigured (mock/dev mode) — detection_ok stays True with
# empty maps, so dev mode never artificially locks drives.
except Exception:
detection_ok = False
if not detection_ok:
log.warning(
"Pool detection failed this cycle — preserving existing "
"pool_name/pool_role columns. Locked drives stay locked, "
"unlocked drives stay unlocked, until SSH recovers."
)
if detection_ok:
# Drives carrying ZFS labels but not in any active pool are
# "exported" — same hazard as an active pool member, so lock them
# too. We don't know the original pool name without
# `zpool import`-style scanning (slow + blocks); display
# "(exported)" and use a special token.
for devname in zfs_member_set:
if devname not in pool_map:
pool_map[devname] = {"pool": "(exported)", "role": "exported"}
# Drives with a non-ZFS mount somewhere (XFS/ext4/scratch/etc.)
# also lock — wiping a mounted FS is just as catastrophic. Lower
# precedence than active pool membership, since a drive in `tank`
# would also show under findmnt for the pool's mountpoint via
# /dev/zd* or zvol — but those are filtered in the parser.
for devname in mounted_set:
if devname not in pool_map:
pool_map[devname] = {"pool": "(mounted)", "role": "mounted"}
# Index running jobs by (devname, test_type)
active: dict[tuple[str, str], dict] = {}
for job in running_jobs:
try:
args = job["arguments"][0]
devname = args["disks"][0]
ttype = args["type"].lower()
active[(devname, ttype)] = job
except (KeyError, IndexError, TypeError):
pass
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
for disk in disks:
devname = disk["devname"]
drive_id = await _upsert_drive(
db, disk, now,
pool_map.get(devname) if detection_ok else None,
update_pool=detection_ok,
)
for ttype in ("short", "long"):
if (devname, ttype) in active:
await _apply_running_job(db, drive_id, ttype, active[(devname, ttype)])
else:
await _sync_history(db, client, drive_id, devname, ttype)
await db.commit()
# SSH SMART polling — for tests started via smartctl (no TrueNAS REST job)
await _poll_smart_via_ssh(db, now)
return len(disks)
# ---------------------------------------------------------------------------
# Background loop
# ---------------------------------------------------------------------------
async def run(client: TrueNASClient) -> None:
log.info("Poller started", extra={"poll_interval": settings.poll_interval_seconds})
cycle = 0
while True:
try:
count = await poll_cycle(client)
cycle += 1
_state["last_poll_at"] = _now()
_state["last_error"] = None
_state["healthy"] = True
_state["drives_seen"] = count
_state["consecutive_failures"] = 0
log.debug("Poll OK", extra={"drives": count})
# System sensor temps via SSH (non-fatal)
from app import ssh_client as _ssh
if _ssh.is_configured():
try:
_state["system_temps"] = await _ssh.get_system_sensors()
except Exception:
pass
# Thermal pressure: max temp of drives currently under burn-in
try:
async with aiosqlite.connect(settings.db_path) as _tdb:
_tdb.row_factory = aiosqlite.Row
await _tdb.execute("PRAGMA journal_mode=WAL")
_cur = await _tdb.execute("""
SELECT MAX(d.temperature_c)
FROM drives d
JOIN burnin_jobs bj ON bj.drive_id = d.id
WHERE bj.state = 'running' AND d.temperature_c IS NOT NULL
""")
_row = await _cur.fetchone()
_max_t = _row[0] if _row and _row[0] is not None else None
if _max_t is None:
_state["thermal_pressure"] = "ok"
elif _max_t >= settings.temp_crit_c:
_state["thermal_pressure"] = "crit"
elif _max_t >= settings.temp_warn_c:
_state["thermal_pressure"] = "warn"
else:
_state["thermal_pressure"] = "ok"
except Exception:
_state["thermal_pressure"] = "ok"
_notify_subscribers()
# Check for stuck jobs every 5 cycles (~1 min at default 12s interval)
if cycle % 5 == 0:
try:
from app import burnin as _burnin
await _burnin.check_stuck_jobs()
except Exception as exc:
log.error("Stuck-job check failed: %s", exc)
except Exception as exc:
failures = _state["consecutive_failures"] + 1
_state["consecutive_failures"] = failures
_state["last_error"] = str(exc)
_state["healthy"] = False
if failures >= 5:
log.critical(
"Poller has failed %d consecutive times: %s",
failures, exc,
extra={"consecutive_failures": failures},
)
else:
log.error("Poll failed: %s", exc, extra={"consecutive_failures": failures})
await asyncio.sleep(settings.poll_interval_seconds)