nas-burnin/app/routes.py
Brandon Walter d4c0770b9e feat: app-level login + hardening sweep (1.0.0-22 -> 1.0.0-23)
Two layered changes shipped in this branch:

== 1.0.0-22: app-level authentication ==

The dashboard previously had only an IP allowlist. Adds username +
bcrypt password auth, signed-cookie sessions, and a "first user setup"
flow.

* New app/auth.py: User dataclass, bcrypt hash/verify, get_user_by_id/
  username, create_user, touch_last_login, FastAPI `get_current_user`
  dependency. Session secret loaded from SESSION_SECRET env or persisted
  to /data/session_secret.
* New app/auth_cli.py: `python -m app.auth_cli list|reset|add` for
  out-of-band user management. Passwords always read from a TTY prompt.
* Schema: idempotent ALTER for `users` table (id, username unique,
  password_hash, full_name, is_admin, created_at, last_login_at).
* main.py: SessionMiddleware (HMAC-signed cookie, max-age 7 days,
  SameSite=strict — see hardening section) + _AuthGateMiddleware that
  populates request.state.current_user and bounces unauth'd HTML GETs
  to /login while returning 401 JSON for everything else.
* Routes: GET /login renders first-user-setup form when users table is
  empty otherwise sign-in form; POST /login; POST /api/v1/auth/setup
  (only works while empty); GET|POST /logout.
* Bootstrap: env vars INITIAL_ADMIN_USERNAME + INITIAL_ADMIN_PASSWORD
  create the first admin on startup if both set AND users table empty.
  Ignored thereafter — change passwords via UI or CLI.
* Layout: header shows current_user.full_name|username + Logout link.
  Modal operator field auto-fills from the logged-in user via
  <meta name="default-operator"> rendered in layout (replaces the
  localStorage-only previous behaviour).
* requirements.txt: pinned bcrypt>=4.0,<5.0, itsdangerous>=2.1,
  python-multipart>=0.0.7. First step toward addressing the
  unpinned-deps gotcha.
* New app/templates/login.html with first-user-setup variant.

== 1.0.0-23: hardening sweep ==

Closes the eight-item gap audit:

* DB retention + automated backup. New app/retention.py runs daily at
  03:00 local. Nulls burnin_stages.log_text on stages older than
  retention_log_days (default 35), VACUUMs to reclaim pages, then runs
  `sqlite3 .backup` to /data/backups/app-YYYY-MM-DD.db keeping the
  retention_backup_keep most recent (default 14). Wired into the
  lifespan supervisor next to mailer/poller.

* CSRF mitigation. SessionMiddleware bumped to SameSite=strict so the
  browser refuses to send the session cookie on cross-site POSTs —
  removes the actual CSRF vector. Trade-off: external links into the
  app require re-auth.

* Login rate limiting. In-memory per-username AND per-source-IP failure
  counters in auth.py. 10 failures within 10 min trips a 15-min lockout
  for both keys. Returns HTTP 429 with a clear "try again in N min"
  message. Cleared on successful login.

* Login audit events. New event types in audit_events: user_login,
  user_login_failed, user_login_locked_out, user_logout,
  user_password_changed. All include source IP. Recorded via
  auth.audit_auth_event().

* Password change UI. Header link "Change password" opens
  templates/components/modal_password.html (current/new/confirm).
  Posts to POST /api/v1/auth/change-password — bcrypt-verifies current,
  requires >=8 char new pw, writes audit event.

* NVMe burn-in path. _stage_surface_validate now detects nvme*
  devnames and routes to _stage_surface_validate_nvme() which runs
  `nvme format -s 1 --force` (cryptographic erase). Seconds vs hours
  of badblocks, exercises the controller's secure-erase. Falls back
  to badblocks if nvme-cli isn't installed. Post-format SMART check.

* Mounted-FS detection. ssh_client.get_mounted_drives() runs
  `findmnt -no SOURCE`, parses non-ZFS sources back to base devnames.
  Poller treats them as pool_name='(mounted)', pool_role='mounted'.
  Confirm token DESTROY MOUNTED FILESYSTEM, distinct purple styling,
  audit event mounted_drive_unlocked, daily-report banner picks it up.

* Deeper /health. Real readiness check — DB write probe (PRAGMA
  journal_mode), poller freshness (age <= 3x stale_threshold), SSH
  test_connection() when configured. Returns 503 when any check fails
  so a proxy/orchestrator can take the container out of rotation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 11:08:29 -04:00

1418 lines
51 KiB
Python

import asyncio
import csv
import io
import json
from datetime import datetime, timezone
import aiosqlite
from fastapi import APIRouter, Depends, HTTPException, Query, Request, WebSocket
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from sse_starlette.sse import EventSourceResponse
from app import auth, burnin, mailer, poller, settings_store
from app.config import settings
from app.database import get_db
from app.models import (
BurninJobResponse, BurninStageResponse,
CancelBurninRequest, DriveResponse,
SmartTestState, StartBurninRequest, UnlockPoolDriveRequest,
UpdateDriveRequest,
)
from app.renderer import templates
router = APIRouter()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _eta_seconds(eta_at: str | None) -> int | None:
if not eta_at:
return None
try:
eta_ts = datetime.fromisoformat(eta_at)
if eta_ts.tzinfo is None:
eta_ts = eta_ts.replace(tzinfo=timezone.utc)
remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds()
return max(0, int(remaining))
except Exception:
return None
def _is_stale(last_polled_at: str) -> bool:
try:
last = datetime.fromisoformat(last_polled_at)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - last).total_seconds() > settings.stale_threshold_seconds
except Exception:
return True
def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None:
"""Linear ETA extrapolation from started_at and percent complete."""
if not started_at or percent <= 0:
return None
try:
start = datetime.fromisoformat(started_at)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
total_est = elapsed / (percent / 100)
remaining = max(0, int(total_est - elapsed))
return remaining
except Exception:
return None
def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState:
eta_at = row[f"{prefix}_eta_at"]
return SmartTestState(
state=row[f"{prefix}_state"] or "idle",
percent=row[f"{prefix}_percent"],
eta_seconds=_eta_seconds(eta_at),
eta_timestamp=eta_at,
started_at=row[f"{prefix}_started_at"],
finished_at=row[f"{prefix}_finished_at"],
error_text=row[f"{prefix}_error"],
)
def _row_to_drive(row: aiosqlite.Row) -> DriveResponse:
return DriveResponse(
id=row["id"],
devname=row["devname"],
serial=row["serial"],
model=row["model"],
size_bytes=row["size_bytes"],
temperature_c=row["temperature_c"],
smart_health=row["smart_health"] or "UNKNOWN",
last_polled_at=row["last_polled_at"],
is_stale=_is_stale(row["last_polled_at"]),
smart_short=_build_smart(row, "short"),
smart_long=_build_smart(row, "long"),
notes=row["notes"],
location=row["location"],
pool_name=row["pool_name"],
pool_role=row["pool_role"],
pool_unlocked_until=burnin.unlock_expiry(
row["id"], row["pool_name"], row["pool_role"],
),
)
def _compute_status(drive: dict) -> str:
short = (drive.get("smart_short") or {}).get("state", "idle")
long_ = (drive.get("smart_long") or {}).get("state", "idle")
health = drive.get("smart_health", "UNKNOWN")
if "running" in (short, long_):
return "running"
if short == "failed" or long_ == "failed" or health == "FAILED":
return "failed"
if "passed" in (short, long_):
return "passed"
return "idle"
_DRIVES_QUERY = """
SELECT
d.id, d.devname, d.serial, d.model, d.size_bytes,
d.temperature_c, d.smart_health, d.last_polled_at,
d.notes, d.location, d.pool_name, d.pool_role,
s.state AS short_state,
s.percent AS short_percent,
s.started_at AS short_started_at,
s.eta_at AS short_eta_at,
s.finished_at AS short_finished_at,
s.error_text AS short_error,
l.state AS long_state,
l.percent AS long_percent,
l.started_at AS long_started_at,
l.eta_at AS long_eta_at,
l.finished_at AS long_finished_at,
l.error_text AS long_error
FROM drives d
LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short'
LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long'
WHERE d.last_seen_at >= datetime('now', '-7 days')
{where}
ORDER BY d.devname
"""
async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]:
"""Return latest burn-in job (any state) keyed by drive_id.
Jobs created before the drive's last_reset_at are excluded so the
dashboard burn-in column clears after a reset while history is preserved.
"""
cur = await db.execute("""
SELECT bj.*
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id)
AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at)
""")
rows = await cur.fetchall()
return {r["drive_id"]: dict(r) for r in rows}
async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]:
cur = await db.execute(_DRIVES_QUERY.format(where=""))
rows = await cur.fetchall()
burnin_by_drive = await _fetch_burnin_by_drive(db)
# For burn-ins that include SMART stages, fetch those stages so we can
# mirror their progress/result in the Short/Long SMART columns.
# This covers both running stages (showing live progress) and completed
# stages (showing passed/failed after the burn-in moves to the next stage).
bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row}
bi_ids_with_smart = [
bi["id"] for bi in burnin_by_drive.values()
if bi["state"] in ("running", "queued")
]
if bi_ids_with_smart:
placeholders = ",".join("?" * len(bi_ids_with_smart))
cur = await db.execute(f"""
SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent,
bs.started_at, bs.finished_at, bs.error_text
FROM burnin_stages bs
WHERE bs.burnin_job_id IN ({placeholders})
AND bs.stage_name IN ('short_smart', 'long_smart')
AND bs.state IN ('running', 'passed', 'failed')
""", bi_ids_with_smart)
for r in await cur.fetchall():
bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r)
drives = []
for row in rows:
d = _row_to_drive(row).model_dump()
d["status"] = _compute_status(d)
bi = burnin_by_drive.get(d["id"])
d["burnin"] = bi
# Overlay burn-in SMART stage progress/results onto the SMART columns
if bi and bi["id"] in bi_smart_stages:
for stage_name, stage in bi_smart_stages[bi["id"]].items():
target = "smart_short" if stage_name == "short_smart" else "smart_long"
# Only overlay if the standalone SMART column is idle/empty
existing = d.get(target) or {}
if existing.get("state") not in (None, "idle"):
continue
pct = stage["percent"] or 0
d[target] = {
"state": stage["state"],
"percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0),
"eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None,
"eta_timestamp": None,
"started_at": stage["started_at"],
"finished_at": stage["finished_at"],
"error_text": stage["error_text"],
}
drives.append(d)
return drives
def _stale_context(poller_state: dict) -> dict:
last = poller_state.get("last_poll_at")
if not last:
return {"stale": False, "stale_seconds": 0}
try:
dt = datetime.fromisoformat(last)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
elapsed = int((datetime.now(timezone.utc) - dt).total_seconds())
stale = elapsed > settings.stale_threshold_seconds
return {"stale": stale, "stale_seconds": elapsed}
except Exception:
return {"stale": False, "stale_seconds": 0}
# ---------------------------------------------------------------------------
# Auth — login / logout / first-user setup
# ---------------------------------------------------------------------------
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, next: str = "/", error: str | None = None):
needs_setup = (await auth.user_count()) == 0
return templates.TemplateResponse(request, "login.html", {
"request": request,
"needs_setup": needs_setup,
"error": error,
"next": next if next.startswith("/") else "/",
})
def _client_ip(request: Request) -> str:
fwd = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip()
return fwd or (request.client.host if request.client else "unknown")
@router.post("/login")
async def login_submit(request: Request):
form = await request.form()
username = (form.get("username") or "").strip()
password = form.get("password") or ""
next_url = form.get("next") or "/"
if not next_url.startswith("/"):
next_url = "/"
ip = _client_ip(request)
# Rate-limit gate — checked BEFORE bcrypt so an attacker can't burn CPU.
locked_until = auth.login_locked_until(username, ip)
if locked_until is not None:
remaining = int(locked_until - __import__("time").time())
return templates.TemplateResponse(request, "login.html", {
"request": request,
"needs_setup": False,
"error": f"Too many failed attempts. Try again in {remaining // 60} min.",
"next": next_url,
}, status_code=429)
found = await auth.get_user_by_username(username)
if not found or not auth.verify_password(password, found[1]):
# Constant-ish-time: still call verify on a junk hash if user missing
# so the timing of "user not found" matches "wrong password."
if not found:
auth.verify_password(password, "$2b$12$" + "x" * 53)
tripped = auth.record_login_failure(username, ip)
await auth.audit_auth_event(
"user_login_locked_out" if tripped else "user_login_failed",
username,
f"Failed login from {ip}" + (
f" — IP/user locked out for {auth.LOGIN_LOCKOUT_SECONDS // 60} min"
if tripped else ""
),
)
return templates.TemplateResponse(request, "login.html", {
"request": request,
"needs_setup": False,
"error": "Invalid username or password.",
"next": next_url,
}, status_code=401)
user = found[0]
auth.clear_login_failures(username, ip)
request.session["user_id"] = user.id
request.session["username"] = user.username
await auth.touch_last_login(user.id)
await auth.audit_auth_event(
"user_login", user.username, f"Signed in from {ip}"
)
return RedirectResponse(url=next_url, status_code=303)
@router.post("/api/v1/auth/setup")
async def auth_first_user_setup(request: Request):
"""Create the first admin from the login page when the users table is
empty. Public endpoint — but only does anything when zero users exist."""
if (await auth.user_count()) > 0:
raise HTTPException(status_code=409, detail="Users already exist.")
form = await request.form()
username = (form.get("username") or "").strip()
password = form.get("password") or ""
full_name = (form.get("full_name") or "").strip() or None
try:
user = await auth.create_user(username, password, full_name, is_admin=True)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
request.session["user_id"] = user.id
request.session["username"] = user.username
await auth.touch_last_login(user.id)
return RedirectResponse(url="/", status_code=303)
@router.get("/logout")
@router.post("/logout")
async def logout(request: Request):
user = request.state.current_user if hasattr(request.state, "current_user") else None
if user:
await auth.audit_auth_event(
"user_logout", user.username, f"Signed out from {_client_ip(request)}"
)
request.session.clear()
return RedirectResponse(url="/login", status_code=303)
@router.post("/api/v1/auth/change-password")
async def change_password(request: Request):
user = request.state.current_user if hasattr(request.state, "current_user") else None
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
form = await request.form()
current = form.get("current_password") or ""
new_pw = form.get("new_password") or ""
confirm = form.get("confirm_password") or ""
if new_pw != confirm:
raise HTTPException(status_code=400, detail="New passwords do not match.")
try:
await auth.change_password(user.id, current, new_pw)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
await auth.audit_auth_event(
"user_password_changed", user.username,
f"Password changed from {_client_ip(request)}",
)
return {"ok": True}
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@router.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, db: aiosqlite.Connection = Depends(get_db)):
drives = await _fetch_drives_for_template(db)
ps = poller.get_state()
return templates.TemplateResponse(request, "dashboard.html", {
"request": request,
"drives": drives,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# SSE — live drive table updates
# ---------------------------------------------------------------------------
@router.get("/sse/drives")
async def sse_drives(request: Request):
q = poller.subscribe()
async def generate():
try:
while True:
# Wait for next poll notification or keepalive timeout
try:
payload = await asyncio.wait_for(q.get(), timeout=25.0)
except asyncio.TimeoutError:
if await request.is_disconnected():
break
yield {"event": "keepalive", "data": ""}
continue
if await request.is_disconnected():
break
# Extract alert from payload (may be None for regular polls)
alert = None
if isinstance(payload, dict):
alert = payload.get("alert")
# Render fresh table HTML
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
drives = await _fetch_drives_for_template(db)
html = templates.env.get_template(
"components/drives_table.html"
).render(drives=drives)
yield {"event": "drives-update", "data": html}
# Push system sensor state so JS can update temp chips live
ps = poller.get_state()
yield {
"event": "system-sensors",
"data": json.dumps({
"system_temps": ps.get("system_temps", {}),
"thermal_pressure": ps.get("thermal_pressure", "ok"),
"temp_warn_c": settings.temp_warn_c,
"temp_crit_c": settings.temp_crit_c,
}),
}
# Push browser notification event if this was a job completion
if alert:
yield {"event": "job-alert", "data": json.dumps(alert)}
finally:
poller.unsubscribe(q)
return EventSourceResponse(generate())
# ---------------------------------------------------------------------------
# JSON API
# ---------------------------------------------------------------------------
@router.get("/health")
async def health(db: aiosqlite.Connection = Depends(get_db)):
"""Real readiness check, not just process-is-running.
Verifies (a) DB writable, (b) poller has succeeded recently relative
to the configured stale_threshold_seconds, (c) SSH reachable when
configured. Returns 503 when any check fails so a proxy/orchestrator
health probe can take the container out of rotation.
"""
from datetime import datetime, timezone
from fastapi.responses import JSONResponse
from app import ssh_client as _ssh
checks: dict[str, dict] = {}
# DB probe — confirm the journal is healthy (PRAGMA reads journal_mode
# and would fail loudly if WAL is wedged or the file is unreadable).
try:
cur = await db.execute("PRAGMA journal_mode")
await cur.fetchone()
checks["db"] = {"ok": True}
except Exception as exc:
checks["db"] = {"ok": False, "error": str(exc)}
ps = poller.get_state()
last = ps.get("last_poll_at")
poll_age = None
if last:
try:
t = datetime.fromisoformat(last)
if t.tzinfo is None:
t = t.replace(tzinfo=timezone.utc)
poll_age = (datetime.now(timezone.utc) - t).total_seconds()
except Exception:
poll_age = None
poll_ok = ps.get("healthy") and (
poll_age is None or poll_age <= settings.stale_threshold_seconds * 3
)
checks["poller"] = {
"ok": bool(poll_ok),
"last_error": ps.get("last_error"),
"last_poll_at": last,
"age_seconds": int(poll_age) if poll_age is not None else None,
}
# SSH probe — only when configured. Cheap (single sensors -j).
if _ssh.is_configured():
try:
r = await _ssh.test_connection()
checks["ssh"] = {"ok": bool(r.get("ok")),
"error": r.get("error")}
except Exception as exc:
checks["ssh"] = {"ok": False, "error": str(exc)}
else:
checks["ssh"] = {"ok": True, "skipped": True}
cur = await db.execute("SELECT COUNT(*) FROM drives")
row = await cur.fetchone()
drives_tracked = row[0] if row else 0
status_ok = all(c["ok"] for c in checks.values())
body = {
"status": "ok" if status_ok else "degraded",
"checks": checks,
"drives_tracked": drives_tracked,
"poll_interval_s": settings.poll_interval_seconds,
"version": settings.app_version,
}
return JSONResponse(body, status_code=200 if status_ok else 503)
@router.get("/api/v1/drives", response_model=list[DriveResponse])
async def list_drives(db: aiosqlite.Connection = Depends(get_db)):
cur = await db.execute(_DRIVES_QUERY.format(where=""))
rows = await cur.fetchall()
return [_row_to_drive(r) for r in rows]
@router.get("/api/v1/drives/{drive_id}/drawer")
async def drive_drawer(drive_id: int, db: aiosqlite.Connection = Depends(get_db)):
"""Data for the log drawer — latest burn-in job + stages, SMART tests, audit events."""
cur = await db.execute(_DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
drive = _row_to_drive(row)
# Latest burn-in job + its stages (include log_text and bad_blocks)
cur = await db.execute(
"SELECT * FROM burnin_jobs WHERE drive_id=? ORDER BY id DESC LIMIT 1",
(drive_id,),
)
job_row = await cur.fetchone()
burnin = None
if job_row:
job = dict(job_row)
cur = await db.execute(
"SELECT id, stage_name, state, percent, started_at, finished_at, "
"duration_seconds, error_text, log_text, bad_blocks "
"FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
(job_row["id"],),
)
job["stages"] = [dict(r) for r in await cur.fetchall()]
burnin = job
# SMART raw output from smart_tests table
cur = await db.execute(
"SELECT test_type, state, percent, started_at, finished_at, error_text, raw_output "
"FROM smart_tests WHERE drive_id=?",
(drive_id,),
)
smart_rows = {r["test_type"]: dict(r) for r in await cur.fetchall()}
# Cached SMART attributes (JSON blob on drives table)
import json as _json
smart_attrs = None
cur = await db.execute("SELECT smart_attrs FROM drives WHERE id=?", (drive_id,))
attrs_row = await cur.fetchone()
if attrs_row and attrs_row["smart_attrs"]:
try:
smart_attrs = _json.loads(attrs_row["smart_attrs"])
except Exception:
pass
# Last 50 audit events for this drive (newest first)
cur = await db.execute("""
SELECT id, event_type, operator, message, created_at
FROM audit_events
WHERE drive_id = ?
ORDER BY id DESC
LIMIT 50
""", (drive_id,))
events = [dict(r) for r in await cur.fetchall()]
def _smart_card(test_type: str) -> dict:
smart_obj = drive.smart_short if test_type == "short" else drive.smart_long
base = smart_obj.model_dump() if smart_obj else {}
row = smart_rows.get(test_type, {})
base["raw_output"] = row.get("raw_output")
return base
return {
"drive": {
"id": drive.id,
"devname": drive.devname,
"serial": drive.serial,
"model": drive.model,
"size_bytes": drive.size_bytes,
},
"burnin": burnin,
"smart": {
"short": _smart_card("short"),
"long": _smart_card("long"),
"attrs": smart_attrs,
},
"events": events,
}
@router.get("/api/v1/drives/{drive_id}", response_model=DriveResponse)
async def get_drive(drive_id: int, db: aiosqlite.Connection = Depends(get_db)):
cur = await db.execute(
_DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,)
)
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
return _row_to_drive(row)
@router.post("/api/v1/drives/{drive_id}/smart/start")
async def smart_start(
drive_id: int,
body: dict,
db: aiosqlite.Connection = Depends(get_db),
):
"""Start a standalone SHORT or LONG SMART test on a single drive.
Uses SSH (smartctl) when configured — required for TrueNAS SCALE 25.10+
where the REST smart/test endpoint no longer exists.
Falls back to TrueNAS REST API for older versions.
"""
from app import burnin as _burnin, ssh_client
test_type = (body.get("type") or "").upper()
if test_type not in ("SHORT", "LONG"):
raise HTTPException(status_code=422, detail="type must be SHORT or LONG")
cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
devname = row[0]
now = datetime.now(timezone.utc).isoformat()
ttype_lower = test_type.lower()
if ssh_client.is_configured():
# SSH path — works on TrueNAS SCALE 25.10+ and CORE
try:
output = await ssh_client.start_smart_test(devname, test_type)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"SSH error: {exc}")
# Mark as running in DB (truenas_job_id=NULL signals SSH-managed test)
# Store smartctl start output as proof the test was initiated
await db.execute(
"""INSERT INTO smart_tests (drive_id, test_type, state, percent, started_at, raw_output)
VALUES (?,?,?,?,?,?)
ON CONFLICT(drive_id, test_type) DO UPDATE SET
state='running', percent=0, truenas_job_id=NULL,
started_at=excluded.started_at, finished_at=NULL, error_text=NULL,
raw_output=excluded.raw_output""",
(drive_id, ttype_lower, "running", 0, now, output),
)
await db.commit()
from app import poller as _poller
_poller._notify_subscribers()
return {"devname": devname, "type": test_type, "message": output[:200]}
else:
# REST path — older TrueNAS CORE / SCALE versions
client = _burnin._client
if client is None:
raise HTTPException(status_code=503, detail="TrueNAS client not ready")
try:
tn_job_id = await client.start_smart_test([devname], test_type)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}")
return {"job_id": tn_job_id, "devname": devname, "type": test_type}
@router.post("/api/v1/drives/{drive_id}/smart/cancel")
async def smart_cancel(
drive_id: int,
body: dict,
db: aiosqlite.Connection = Depends(get_db),
):
"""Cancel a running standalone SMART test on a drive."""
from app import burnin as _burnin
test_type = (body.get("type") or "").lower()
if test_type not in ("short", "long"):
raise HTTPException(status_code=422, detail="type must be 'short' or 'long'")
cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
devname = row[0]
client = _burnin._client
if client is None:
raise HTTPException(status_code=503, detail="TrueNAS client not ready")
from app import ssh_client
if ssh_client.is_configured():
# SSH path — abort via smartctl -X
try:
await ssh_client.abort_smart_test(devname)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"SSH abort error: {exc}")
else:
# REST path — find TrueNAS job and abort it
try:
jobs = await client.get_smart_jobs()
tn_job_id = None
for j in jobs:
if j.get("state") != "RUNNING":
continue
args = j.get("arguments", [])
if not args or not isinstance(args[0], dict):
continue
if devname in args[0].get("disks", []):
tn_job_id = j["id"]
break
if tn_job_id is None:
raise HTTPException(status_code=404, detail="No running SMART test found for this drive")
await client.abort_job(tn_job_id)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}")
# Update local DB state
now = datetime.now(timezone.utc).isoformat()
await db.execute(
"UPDATE smart_tests SET state='aborted', finished_at=? WHERE drive_id=? AND test_type=? AND state='running'",
(now, drive_id, test_type),
)
await db.commit()
return {"cancelled": True, "devname": devname, "type": test_type}
# ---------------------------------------------------------------------------
# Burn-in API
# ---------------------------------------------------------------------------
def _row_to_burnin(row: aiosqlite.Row, stages: list[aiosqlite.Row]) -> BurninJobResponse:
return BurninJobResponse(
id=row["id"],
drive_id=row["drive_id"],
profile=row["profile"],
state=row["state"],
percent=row["percent"] or 0,
stage_name=row["stage_name"],
operator=row["operator"],
created_at=row["created_at"],
started_at=row["started_at"],
finished_at=row["finished_at"],
error_text=row["error_text"],
stages=[
BurninStageResponse(
id=s["id"],
stage_name=s["stage_name"],
state=s["state"],
percent=s["percent"] or 0,
started_at=s["started_at"],
finished_at=s["finished_at"],
error_text=s["error_text"],
)
for s in stages
],
)
@router.post("/api/v1/burnin/start")
async def burnin_start(req: StartBurninRequest):
results = []
errors = []
for drive_id in req.drive_ids:
try:
job_id = await burnin.start_job(
drive_id, req.profile, req.operator, stage_order=req.stage_order
)
results.append({"drive_id": drive_id, "job_id": job_id})
except burnin.PoolMemberError as exc:
errors.append({
"drive_id": drive_id,
"error": str(exc),
"pool_name": exc.pool_name,
"pool_role": exc.pool_role,
"pool_locked": True,
})
except ValueError as exc:
errors.append({"drive_id": drive_id, "error": str(exc)})
if errors and not results:
# Surface the first error's structured fields so the UI can render
# an unlock affordance instead of a generic toast.
raise HTTPException(status_code=409, detail=errors[0])
return {"queued": results, "errors": errors}
@router.post("/api/v1/drives/{drive_id}/unlock")
async def unlock_pool_drive(drive_id: int, req: UnlockPoolDriveRequest):
try:
expiry = await burnin.grant_pool_unlock(
drive_id, req.confirm_token, req.operator, req.reason,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return {"unlocked": True, "expires_at": expiry,
"ttl_seconds": burnin.UNLOCK_TTL_SECONDS}
@router.post("/api/v1/burnin/{job_id}/cancel")
async def burnin_cancel(job_id: int, req: CancelBurninRequest):
ok = await burnin.cancel_job(job_id, req.operator)
if not ok:
raise HTTPException(status_code=409, detail="Job not found or not cancellable")
return {"cancelled": True}
# ---------------------------------------------------------------------------
# History pages
# ---------------------------------------------------------------------------
_PAGE_SIZE = 50
_ALL_STATES = ("queued", "running", "passed", "failed", "cancelled", "unknown")
_HISTORY_QUERY = """
SELECT
bj.id, bj.drive_id, bj.profile, bj.state, bj.operator,
bj.created_at, bj.started_at, bj.finished_at, bj.error_text,
d.devname, d.serial, d.model, d.size_bytes,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
{where}
ORDER BY bj.id DESC
"""
def _state_where(state: str) -> tuple[str, list]:
if state == "all":
return "", []
return "WHERE bj.state = ?", [state]
@router.get("/history", response_class=HTMLResponse)
async def history_list(
request: Request,
state: str = Query(default="all"),
page: int = Query(default=1, ge=1),
db: aiosqlite.Connection = Depends(get_db),
):
if state not in ("all",) + _ALL_STATES:
state = "all"
where_clause, params = _state_where(state)
# Total count
count_sql = f"SELECT COUNT(*) FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id {where_clause}"
cur = await db.execute(count_sql, params)
total_count = (await cur.fetchone())[0]
total_pages = max(1, (total_count + _PAGE_SIZE - 1) // _PAGE_SIZE)
page = min(page, total_pages)
offset = (page - 1) * _PAGE_SIZE
# Per-state counts for badges
cur = await db.execute(
"SELECT state, COUNT(*) FROM burnin_jobs GROUP BY state"
)
counts = {"all": total_count if state == "all" else 0}
for r in await cur.fetchall():
counts[r[0]] = r[1]
if state != "all":
cur2 = await db.execute("SELECT COUNT(*) FROM burnin_jobs")
counts["all"] = (await cur2.fetchone())[0]
# Job rows
sql = _HISTORY_QUERY.format(where=where_clause) + " LIMIT ? OFFSET ?"
cur = await db.execute(sql, params + [_PAGE_SIZE, offset])
rows = await cur.fetchall()
jobs = [dict(r) for r in rows]
ps = poller.get_state()
return templates.TemplateResponse(request, "history.html", {
"request": request,
"jobs": jobs,
"active_state": state,
"counts": counts,
"page": page,
"total_pages": total_pages,
"total_count": total_count,
"poller": ps,
**_stale_context(ps),
})
@router.get("/history/{job_id}", response_class=HTMLResponse)
async def history_detail(
request: Request,
job_id: int,
db: aiosqlite.Connection = Depends(get_db),
):
# Job + drive info
cur = await db.execute("""
SELECT
bj.*, d.devname, d.serial, d.model, d.size_bytes,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id = ?
""", (job_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Burn-in job not found")
job = dict(row)
# Stages (with duration)
cur = await db.execute("""
SELECT *,
CAST(
(julianday(finished_at) - julianday(started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_stages
WHERE burnin_job_id = ?
ORDER BY id
""", (job_id,))
job["stages"] = [dict(r) for r in await cur.fetchall()]
ps = poller.get_state()
return templates.TemplateResponse(request, "job_detail.html", {
"request": request,
"job": job,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# CSV export
# ---------------------------------------------------------------------------
@router.get("/api/v1/burnin/export.csv")
async def burnin_export_csv(db: aiosqlite.Connection = Depends(get_db)):
cur = await db.execute("""
SELECT
bj.id AS job_id,
bj.drive_id,
d.devname,
d.serial,
d.model,
bj.profile,
bj.state,
bj.operator,
bj.created_at,
bj.started_at,
bj.finished_at,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds,
bj.error_text
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
ORDER BY bj.id DESC
""")
rows = await cur.fetchall()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow([
"job_id", "drive_id", "devname", "serial", "model",
"profile", "state", "operator",
"created_at", "started_at", "finished_at", "duration_seconds",
"error_text",
])
for r in rows:
writer.writerow(list(r))
buf.seek(0)
return StreamingResponse(
iter([buf.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=burnin_history.csv"},
)
# ---------------------------------------------------------------------------
# On-demand email report
# ---------------------------------------------------------------------------
@router.post("/api/v1/report/send")
async def send_report_now():
"""Trigger the daily status email immediately (for testing SMTP config)."""
if not settings.smtp_host:
raise HTTPException(status_code=503, detail="SMTP not configured (SMTP_HOST is empty)")
try:
await mailer.send_report_now()
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Mail send failed: {exc}")
return {"sent": True, "to": settings.smtp_to}
# ---------------------------------------------------------------------------
# Drive notes / location update
# ---------------------------------------------------------------------------
@router.patch("/api/v1/drives/{drive_id}")
async def update_drive(
drive_id: int,
req: UpdateDriveRequest,
db: aiosqlite.Connection = Depends(get_db),
):
cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,))
if not await cur.fetchone():
raise HTTPException(status_code=404, detail="Drive not found")
await db.execute(
"UPDATE drives SET notes=?, location=? WHERE id=?",
(req.notes, req.location, drive_id),
)
await db.commit()
return {"updated": True}
@router.post("/api/v1/drives/{drive_id}/reset")
async def reset_drive(
drive_id: int,
body: dict,
db: aiosqlite.Connection = Depends(get_db),
):
"""
Clear SMART test results for a drive so it shows as fresh.
Only allowed when no burn-in job is active (queued or running).
Preserves all job history — just resets the display state.
"""
cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,))
if not await cur.fetchone():
raise HTTPException(status_code=404, detail="Drive not found")
# Reject if any active burn-in
cur = await db.execute(
"SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')",
(drive_id,),
)
if (await cur.fetchone())[0] > 0:
raise HTTPException(status_code=409, detail="Cannot reset while a burn-in is active")
operator = body.get("operator", "operator")
# Reset SMART test state to idle
await db.execute(
"""UPDATE smart_tests SET state='idle', percent=0, started_at=NULL,
eta_at=NULL, finished_at=NULL, error_text=NULL, raw_output=NULL
WHERE drive_id=?""",
(drive_id,),
)
# Clear SMART attrs cache + stamp reset time (hides prior burn-in from dashboard)
now = datetime.now(timezone.utc).isoformat()
await db.execute(
"UPDATE drives SET smart_attrs=NULL, last_reset_at=? WHERE id=?",
(now, drive_id),
)
# Audit event
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, operator, message)
VALUES (?,?,?,?)""",
("drive_reset", drive_id, operator, "Drive reset — SMART state cleared"),
)
await db.commit()
poller._notify_subscribers()
return {"reset": True}
# ---------------------------------------------------------------------------
# Audit log page
# ---------------------------------------------------------------------------
_AUDIT_QUERY = """
SELECT
ae.id, ae.event_type, ae.operator, ae.message, ae.created_at,
d.devname, d.serial
FROM audit_events ae
LEFT JOIN drives d ON d.id = ae.drive_id
ORDER BY ae.id DESC
LIMIT 200
"""
_AUDIT_EVENT_COLORS = {
"burnin_queued": "yellow",
"burnin_started": "blue",
"burnin_passed": "passed",
"burnin_failed": "failed",
"burnin_cancelled": "cancelled",
"burnin_stuck": "failed",
"burnin_unknown": "unknown",
}
@router.get("/audit", response_class=HTMLResponse)
async def audit_log(
request: Request,
db: aiosqlite.Connection = Depends(get_db),
):
cur = await db.execute(_AUDIT_QUERY)
rows = [dict(r) for r in await cur.fetchall()]
ps = poller.get_state()
return templates.TemplateResponse(request, "audit.html", {
"request": request,
"events": rows,
"event_colors": _AUDIT_EVENT_COLORS,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# Stats / analytics page
# ---------------------------------------------------------------------------
@router.get("/stats", response_class=HTMLResponse)
async def stats_page(
request: Request,
db: aiosqlite.Connection = Depends(get_db),
):
# Overall counts
cur = await db.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN state='passed' THEN 1 ELSE 0 END) as passed,
SUM(CASE WHEN state='failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN state='running' THEN 1 ELSE 0 END) as running,
SUM(CASE WHEN state='cancelled' THEN 1 ELSE 0 END) as cancelled
FROM burnin_jobs
""")
overall = dict(await cur.fetchone())
# Failure rate by drive model (only completed jobs)
cur = await db.execute("""
SELECT
COALESCE(d.model, 'Unknown') AS model,
COUNT(*) AS total,
SUM(CASE WHEN bj.state='passed' THEN 1 ELSE 0 END) AS passed,
SUM(CASE WHEN bj.state='failed' THEN 1 ELSE 0 END) AS failed,
ROUND(100.0 * SUM(CASE WHEN bj.state='passed' THEN 1 ELSE 0 END) / COUNT(*), 1) AS pass_rate
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.state IN ('passed', 'failed')
GROUP BY COALESCE(d.model, 'Unknown')
ORDER BY total DESC
LIMIT 20
""")
by_model = [dict(r) for r in await cur.fetchall()]
# Activity last 14 days
cur = await db.execute("""
SELECT
date(created_at) AS day,
COUNT(*) AS total,
SUM(CASE WHEN state='passed' THEN 1 ELSE 0 END) AS passed,
SUM(CASE WHEN state='failed' THEN 1 ELSE 0 END) AS failed
FROM burnin_jobs
WHERE created_at >= date('now', '-14 days')
GROUP BY date(created_at)
ORDER BY day DESC
""")
by_day = [dict(r) for r in await cur.fetchall()]
# Average test duration by drive size (rounded to nearest TB)
cur = await db.execute("""
SELECT
CAST(ROUND(CAST(d.size_bytes AS REAL) / 1e12) AS INTEGER) AS size_tb,
COUNT(*) AS total,
ROUND(AVG(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 / 3600.0
), 1) AS avg_hours
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.state IN ('passed', 'failed')
AND bj.started_at IS NOT NULL
AND bj.finished_at IS NOT NULL
GROUP BY size_tb
ORDER BY size_tb
""")
by_size = [dict(r) for r in await cur.fetchall()]
# Failure breakdown by stage (which stage caused the failure)
cur = await db.execute("""
SELECT
COALESCE(bj.stage_name, 'unknown') AS failed_stage,
COUNT(*) AS count
FROM burnin_jobs bj
WHERE bj.state = 'failed'
GROUP BY failed_stage
ORDER BY count DESC
""")
by_failure_stage = [dict(r) for r in await cur.fetchall()]
# Drives tracked
cur = await db.execute("SELECT COUNT(*) FROM drives")
drives_total = (await cur.fetchone())[0]
ps = poller.get_state()
return templates.TemplateResponse(request, "stats.html", {
"request": request,
"overall": overall,
"by_model": by_model,
"by_day": by_day,
"by_size": by_size,
"by_failure_stage": by_failure_stage,
"drives_total": drives_total,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# Settings page
# ---------------------------------------------------------------------------
@router.get("/settings", response_class=HTMLResponse)
async def settings_page(
request: Request,
db: aiosqlite.Connection = Depends(get_db),
):
# Editable values — real values for form fields (secrets excluded)
editable = {
# SMTP
"smtp_host": settings.smtp_host,
"smtp_port": settings.smtp_port,
"smtp_ssl_mode": settings.smtp_ssl_mode or "starttls",
"smtp_timeout": settings.smtp_timeout,
"smtp_user": settings.smtp_user,
"smtp_from": settings.smtp_from,
"smtp_to": settings.smtp_to,
"smtp_report_hour": settings.smtp_report_hour,
"smtp_daily_report_enabled": settings.smtp_daily_report_enabled,
"smtp_alert_on_fail": settings.smtp_alert_on_fail,
"smtp_alert_on_pass": settings.smtp_alert_on_pass,
# Webhook
"webhook_url": settings.webhook_url,
# Burn-in behaviour
"stuck_job_hours": settings.stuck_job_hours,
"max_parallel_burnins": settings.max_parallel_burnins,
"temp_warn_c": settings.temp_warn_c,
"temp_crit_c": settings.temp_crit_c,
"bad_block_threshold": settings.bad_block_threshold,
"surface_validate_block_size": settings.surface_validate_block_size,
"surface_validate_block_buffer": settings.surface_validate_block_buffer,
"surface_validate_passes": settings.surface_validate_passes,
# SSH credentials (take effect immediately — each SSH call reads live settings)
"ssh_host": settings.ssh_host,
"ssh_port": settings.ssh_port,
"ssh_user": settings.ssh_user,
# Note: ssh_password and ssh_key intentionally omitted from display (sensitive)
# System settings (restart required to fully apply)
"truenas_base_url": settings.truenas_base_url,
"truenas_verify_tls": settings.truenas_verify_tls,
"poll_interval_seconds": settings.poll_interval_seconds,
"stale_threshold_seconds": settings.stale_threshold_seconds,
"allowed_ips": settings.allowed_ips,
"log_level": settings.log_level,
# Note: truenas_api_key intentionally omitted from display (sensitive)
}
from app import ssh_client as _ssh
ps = poller.get_state()
return templates.TemplateResponse(request, "settings.html", {
"request": request,
"editable": editable,
"smtp_enabled": bool(settings.smtp_host),
"ssh_configured": _ssh.is_configured(),
"app_version": settings.app_version,
"poller": ps,
**_stale_context(ps),
})
@router.post("/api/v1/settings")
async def save_settings(body: dict):
"""Save editable runtime settings. Secrets are only updated if non-empty."""
# Don't overwrite secrets if client sent empty string
for secret_field in ("smtp_password", "truenas_api_key", "ssh_password", "ssh_key"):
if secret_field in body and body[secret_field] == "":
del body[secret_field]
try:
saved = settings_store.save(body)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc))
return {"saved": True, "keys": saved}
@router.post("/api/v1/settings/test-smtp")
async def test_smtp():
"""Test the current SMTP configuration without sending an email."""
result = await mailer.test_smtp_connection()
if not result["ok"]:
raise HTTPException(status_code=502, detail=result["error"])
return {"ok": True}
@router.post("/api/v1/settings/test-ssh")
async def test_ssh():
"""Test the current SSH configuration."""
from app import ssh_client
result = await ssh_client.test_connection()
if not result["ok"]:
raise HTTPException(status_code=502, detail=result.get("error", "Connection failed"))
return {"ok": True}
@router.websocket("/ws/terminal")
async def terminal_ws(websocket: WebSocket):
"""WebSocket endpoint bridging the browser xterm.js terminal to an SSH PTY."""
from app import terminal as _term
await _term.handle(websocket)
@router.get("/api/v1/updates/check")
async def check_updates():
"""Check for a newer release on Forgejo."""
import httpx
current = settings.app_version
try:
async with httpx.AsyncClient(timeout=8.0) as client:
r = await client.get(
"https://git.hellocomputer.xyz/api/v1/repos/brandon/truenas-burnin/releases/latest",
headers={"Accept": "application/json"},
)
if r.status_code == 200:
data = r.json()
latest = data.get("tag_name", "").lstrip("v")
up_to_date = not latest or latest == current
return {
"current": current,
"latest": latest or None,
"update_available": not up_to_date,
"message": None,
}
elif r.status_code == 404:
return {"current": current, "latest": None, "update_available": False,
"message": "No releases published yet"}
else:
return {"current": current, "latest": None, "update_available": False,
"message": f"Forgejo API returned {r.status_code}"}
except Exception as exc:
return {"current": current, "latest": None, "update_available": False,
"message": f"Could not reach update server: {exc}"}
# ---------------------------------------------------------------------------
# Print view (must be BEFORE /{job_id} int route)
# ---------------------------------------------------------------------------
@router.get("/history/{job_id}/print", response_class=HTMLResponse)
async def history_print(
request: Request,
job_id: int,
db: aiosqlite.Connection = Depends(get_db),
):
cur = await db.execute("""
SELECT
bj.*, d.devname, d.serial, d.model, d.size_bytes,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id = ?
""", (job_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Job not found")
job = dict(row)
cur = await db.execute("""
SELECT *,
CAST(
(julianday(finished_at) - julianday(started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_stages WHERE burnin_job_id=? ORDER BY id
""", (job_id,))
job["stages"] = [dict(r) for r in await cur.fetchall()]
return templates.TemplateResponse(request, "job_print.html", {
"request": request,
"job": job,
})
# ---------------------------------------------------------------------------
# Burn-in job detail API (must be after export.csv to avoid int coercion)
# ---------------------------------------------------------------------------
@router.get("/api/v1/burnin/{job_id}", response_model=BurninJobResponse)
async def burnin_get(job_id: int, db: aiosqlite.Connection = Depends(get_db)):
db.row_factory = aiosqlite.Row
cur = await db.execute("SELECT * FROM burnin_jobs WHERE id=?", (job_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Burn-in job not found")
cur = await db.execute(
"SELECT * FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", (job_id,)
)
stages = await cur.fetchall()
return _row_to_burnin(row, stages)