nas-burnin/app/routes.py
Brandon Walter eb2a964171
Some checks are pending
Security scan / pip-audit (push) Waiting to run
Security scan / bandit (push) Waiting to run
Security scan / gitleaks (push) Waiting to run
fix: address Codex review of burnin package split (1.0.0-32)
Three LOW-severity findings from Codex's audit of the post-split
package, all small mechanical cleanups:

#1 routes.py:848 read burnin.UNLOCK_TTL_SECONDS — a snapshot alias
   bound at import time. After a test (or runtime) monkey-patches
   app.burnin.unlock.UNLOCK_TTL_SECONDS the API response would
   advertise the OLD value while grant_pool_unlock used the new one.
   Now reads burnin.unlock.UNLOCK_TTL_SECONDS directly so the API
   stays in sync with whatever the actual source-of-truth is.

#2 _stage_surface_validate_ssh() carried dead extraction scaffolding
   from when the badblocks logic was first inlined into burnin.py:
   _is_cancelled_sync (sync wrapper that does run_until_complete in
   a coroutine — would deadlock if ever called), last_logged_pct,
   on_progress, accumulated_lines, on_progress_async — none on any
   control-flow path. Plus result["output"] which was set but never
   read. All deleted; the inline _drain coroutines below already
   handle progress/log throttling correctly.

#3 The new module boundaries were leaking — root orchestration
   mutated _remote_pids and _unlock_grants directly even though
   kill.clear_remote_pid() and unlock.invalidate_grant() existed.
   Now using the helpers, so a future change to the storage shape
   only requires editing the owning module.

Bonus from Codex's check note: _get_client() now asserts
burnin._client is not None with a clear message instead of relying
on an obscure NoneType AttributeError if a stage is somehow called
before init().

Verified: 44/44 tests pass; container boots clean; /health 200.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 01:35:07 -04:00

1544 lines
57 KiB
Python

import asyncio
import csv
import io
import json
import time as _time
from datetime import datetime, timezone
import aiosqlite
from fastapi import APIRouter, Depends, HTTPException, Query, Request, WebSocket
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from sse_starlette.sse import EventSourceResponse
from app import auth, burnin, mailer, poller, settings_store
from app.config import settings
from app.database import get_db
from app.models import (
BurninJobResponse, BurninStageResponse,
CancelBurninRequest, DriveResponse,
SmartTestState, StartBurninRequest, UnlockPoolDriveRequest,
UpdateDriveRequest,
)
from app.renderer import templates
router = APIRouter()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _eta_seconds(eta_at: str | None) -> int | None:
if not eta_at:
return None
try:
eta_ts = datetime.fromisoformat(eta_at)
if eta_ts.tzinfo is None:
eta_ts = eta_ts.replace(tzinfo=timezone.utc)
remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds()
return max(0, int(remaining))
except Exception:
return None
def _is_stale(last_polled_at: str) -> bool:
try:
last = datetime.fromisoformat(last_polled_at)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - last).total_seconds() > settings.stale_threshold_seconds
except Exception:
return True
def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None:
"""Linear ETA extrapolation from started_at and percent complete."""
if not started_at or percent <= 0:
return None
try:
start = datetime.fromisoformat(started_at)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
total_est = elapsed / (percent / 100)
remaining = max(0, int(total_est - elapsed))
return remaining
except Exception:
return None
def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState:
eta_at = row[f"{prefix}_eta_at"]
return SmartTestState(
state=row[f"{prefix}_state"] or "idle",
percent=row[f"{prefix}_percent"],
eta_seconds=_eta_seconds(eta_at),
eta_timestamp=eta_at,
started_at=row[f"{prefix}_started_at"],
finished_at=row[f"{prefix}_finished_at"],
error_text=row[f"{prefix}_error"],
)
def _row_to_drive(row: aiosqlite.Row) -> DriveResponse:
return DriveResponse(
id=row["id"],
devname=row["devname"],
serial=row["serial"],
model=row["model"],
size_bytes=row["size_bytes"],
temperature_c=row["temperature_c"],
smart_health=row["smart_health"] or "UNKNOWN",
last_polled_at=row["last_polled_at"],
is_stale=_is_stale(row["last_polled_at"]),
smart_short=_build_smart(row, "short"),
smart_long=_build_smart(row, "long"),
notes=row["notes"],
location=row["location"],
pool_name=row["pool_name"],
pool_role=row["pool_role"],
pool_unlocked_until=burnin.unlock_expiry(
row["id"], row["pool_name"], row["pool_role"],
),
)
def _compute_status(drive: dict) -> str:
short = (drive.get("smart_short") or {}).get("state", "idle")
long_ = (drive.get("smart_long") or {}).get("state", "idle")
health = drive.get("smart_health", "UNKNOWN")
if "running" in (short, long_):
return "running"
if short == "failed" or long_ == "failed" or health == "FAILED":
return "failed"
if "passed" in (short, long_):
return "passed"
return "idle"
_DRIVES_QUERY = """
SELECT
d.id, d.devname, d.serial, d.model, d.size_bytes,
d.temperature_c, d.smart_health, d.last_polled_at,
d.notes, d.location, d.pool_name, d.pool_role,
s.state AS short_state,
s.percent AS short_percent,
s.started_at AS short_started_at,
s.eta_at AS short_eta_at,
s.finished_at AS short_finished_at,
s.error_text AS short_error,
l.state AS long_state,
l.percent AS long_percent,
l.started_at AS long_started_at,
l.eta_at AS long_eta_at,
l.finished_at AS long_finished_at,
l.error_text AS long_error
FROM drives d
LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short'
LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long'
WHERE d.last_seen_at >= datetime('now', '-7 days')
{where}
ORDER BY d.devname
"""
async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]:
"""Return latest burn-in job (any state) keyed by drive_id.
Jobs created before the drive's last_reset_at are excluded so the
dashboard burn-in column clears after a reset while history is preserved.
"""
cur = await db.execute("""
SELECT bj.*
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id)
AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at)
""")
rows = await cur.fetchall()
return {r["drive_id"]: dict(r) for r in rows}
async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]:
cur = await db.execute(_DRIVES_QUERY.format(where=""))
rows = await cur.fetchall()
burnin_by_drive = await _fetch_burnin_by_drive(db)
# For burn-ins that include SMART stages, fetch those stages so we can
# mirror their progress/result in the Short/Long SMART columns.
# This covers both running stages (showing live progress) and completed
# stages (showing passed/failed after the burn-in moves to the next stage).
bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row}
bi_ids_with_smart = [
bi["id"] for bi in burnin_by_drive.values()
if bi["state"] in ("running", "queued")
]
if bi_ids_with_smart:
placeholders = ",".join("?" * len(bi_ids_with_smart))
# placeholders is purely structural ("?,?,?"); IDs themselves are
# bound via the parameter tuple. SQL built via concatenation so
# bandit's B608 (which fires on any f-string SQL) doesn't flag it.
sql = (
"SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent, "
" bs.started_at, bs.finished_at, bs.error_text "
"FROM burnin_stages bs "
"WHERE bs.burnin_job_id IN (" + placeholders + ") "
" AND bs.stage_name IN ('short_smart', 'long_smart') "
" AND bs.state IN ('running', 'passed', 'failed')"
)
cur = await db.execute(sql, bi_ids_with_smart)
for r in await cur.fetchall():
bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r)
drives = []
for row in rows:
d = _row_to_drive(row).model_dump()
d["status"] = _compute_status(d)
bi = burnin_by_drive.get(d["id"])
d["burnin"] = bi
# Overlay burn-in SMART stage progress/results onto the SMART columns
if bi and bi["id"] in bi_smart_stages:
for stage_name, stage in bi_smart_stages[bi["id"]].items():
target = "smart_short" if stage_name == "short_smart" else "smart_long"
# Only overlay if the standalone SMART column is idle/empty
existing = d.get(target) or {}
if existing.get("state") not in (None, "idle"):
continue
pct = stage["percent"] or 0
d[target] = {
"state": stage["state"],
"percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0),
"eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None,
"eta_timestamp": None,
"started_at": stage["started_at"],
"finished_at": stage["finished_at"],
"error_text": stage["error_text"],
}
drives.append(d)
return drives
def _stale_context(poller_state: dict) -> dict:
last = poller_state.get("last_poll_at")
if not last:
return {"stale": False, "stale_seconds": 0}
try:
dt = datetime.fromisoformat(last)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
elapsed = int((datetime.now(timezone.utc) - dt).total_seconds())
stale = elapsed > settings.stale_threshold_seconds
return {"stale": stale, "stale_seconds": elapsed}
except Exception:
return {"stale": False, "stale_seconds": 0}
# ---------------------------------------------------------------------------
# Auth — login / logout / first-user setup
# ---------------------------------------------------------------------------
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, next: str = "/", error: str | None = None):
needs_setup = (await auth.user_count()) == 0
return templates.TemplateResponse(request, "login.html", {
"request": request,
"needs_setup": needs_setup,
"error": error,
"next": next if next.startswith("/") else "/",
})
def _client_ip(request: Request) -> str:
fwd = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip()
return fwd or (request.client.host if request.client else "unknown")
@router.post("/login")
async def login_submit(request: Request):
form = await request.form()
username = (form.get("username") or "").strip()
password = form.get("password") or ""
next_url = form.get("next") or "/"
if not next_url.startswith("/"):
next_url = "/"
ip = _client_ip(request)
# Atomic register-and-check: increments the counter NOW (before any
# await), so a parallel burst of guesses can't all slip past the
# threshold. Cleared on successful auth via clear_login_failures.
attempt = auth.register_login_attempt(username, ip)
if attempt != "ok":
if attempt == "now_locked_out":
await auth.audit_auth_event(
"user_login_locked_out", username,
f"Failed login from {ip} — IP/user locked out for {auth.LOGIN_LOCKOUT_SECONDS // 60} min",
)
locked_until = auth.login_locked_until(username, ip)
remaining = int((locked_until or _time.time()) - _time.time())
return templates.TemplateResponse(request, "login.html", {
"request": request,
"needs_setup": False,
"error": f"Too many failed attempts. Try again in {remaining // 60 + 1} min.",
"next": next_url,
}, status_code=429)
found = await auth.get_user_by_username(username)
if not found or not auth.verify_password(password, found[1]):
# Constant-ish-time: still call verify on a junk hash if user missing
# so the timing of "user not found" matches "wrong password."
if not found:
auth.verify_password(password, "$2b$12$" + "x" * 53)
await auth.audit_auth_event(
"user_login_failed", username, f"Failed login from {ip}",
)
return templates.TemplateResponse(request, "login.html", {
"request": request,
"needs_setup": False,
"error": "Invalid username or password.",
"next": next_url,
}, status_code=401)
user = found[0]
auth.clear_login_failures(username, ip)
# Clear any pre-login session keys before populating the new identity.
# Closes session-fixation: if an attacker had somehow seeded the
# browser with a session cookie, this discards everything in it
# before issuing the new authenticated payload.
request.session.clear()
request.session["user_id"] = user.id
request.session["username"] = user.username
await auth.touch_last_login(user.id)
await auth.audit_auth_event(
"user_login", user.username, f"Signed in from {ip}"
)
return RedirectResponse(url=next_url, status_code=303)
@router.post("/api/v1/auth/setup")
async def auth_first_user_setup(request: Request):
"""Create the first admin from the login page when the users table is
empty. Public endpoint — but only does anything when zero users exist."""
if (await auth.user_count()) > 0:
raise HTTPException(status_code=409, detail="Users already exist.")
form = await request.form()
username = (form.get("username") or "").strip()
password = form.get("password") or ""
full_name = (form.get("full_name") or "").strip() or None
try:
# bootstrap_only=True wraps the existence check + insert in an
# IMMEDIATE transaction so two concurrent setup requests can't
# both create admin accounts during the bootstrap window.
user = await auth.create_user(
username, password, full_name, is_admin=True, bootstrap_only=True
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
# Same fixation defense as the login flow — discard any pre-existing
# session payload before issuing the authenticated identity.
request.session.clear()
request.session["user_id"] = user.id
request.session["username"] = user.username
await auth.touch_last_login(user.id)
return RedirectResponse(url="/", status_code=303)
@router.get("/logout")
@router.post("/logout")
async def logout(request: Request):
user = request.state.current_user if hasattr(request.state, "current_user") else None
if user:
await auth.audit_auth_event(
"user_logout", user.username, f"Signed out from {_client_ip(request)}"
)
request.session.clear()
return RedirectResponse(url="/login", status_code=303)
@router.post("/api/v1/auth/change-password")
async def change_password(request: Request):
user = request.state.current_user if hasattr(request.state, "current_user") else None
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
form = await request.form()
current = form.get("current_password") or ""
new_pw = form.get("new_password") or ""
confirm = form.get("confirm_password") or ""
if new_pw != confirm:
raise HTTPException(status_code=400, detail="New passwords do not match.")
try:
await auth.change_password(user.id, current, new_pw)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
await auth.audit_auth_event(
"user_password_changed", user.username,
f"Password changed from {_client_ip(request)}",
)
return {"ok": True}
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@router.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, db: aiosqlite.Connection = Depends(get_db)):
drives = await _fetch_drives_for_template(db)
ps = poller.get_state()
return templates.TemplateResponse(request, "dashboard.html", {
"request": request,
"drives": drives,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# SSE — live drive table updates
# ---------------------------------------------------------------------------
@router.get("/sse/drives")
async def sse_drives(request: Request):
q = poller.subscribe()
async def generate():
try:
while True:
# Wait for next poll notification or keepalive timeout
try:
payload = await asyncio.wait_for(q.get(), timeout=25.0)
except asyncio.TimeoutError:
if await request.is_disconnected():
break
yield {"event": "keepalive", "data": ""}
continue
if await request.is_disconnected():
break
# Extract alert from payload (may be None for regular polls)
alert = None
if isinstance(payload, dict):
alert = payload.get("alert")
# Render fresh table HTML
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
drives = await _fetch_drives_for_template(db)
html = templates.env.get_template(
"components/drives_table.html"
).render(drives=drives)
yield {"event": "drives-update", "data": html}
# Push system sensor state so JS can update temp chips live
ps = poller.get_state()
yield {
"event": "system-sensors",
"data": json.dumps({
"system_temps": ps.get("system_temps", {}),
"thermal_pressure": ps.get("thermal_pressure", "ok"),
"temp_warn_c": settings.temp_warn_c,
"temp_crit_c": settings.temp_crit_c,
}),
}
# Push browser notification event if this was a job completion
if alert:
yield {"event": "job-alert", "data": json.dumps(alert)}
finally:
poller.unsubscribe(q)
return EventSourceResponse(generate())
# ---------------------------------------------------------------------------
# JSON API
# ---------------------------------------------------------------------------
@router.get("/health")
async def health(db: aiosqlite.Connection = Depends(get_db)):
"""Real readiness check, not just process-is-running.
Verifies (a) DB writable, (b) poller has succeeded recently relative
to the configured stale_threshold_seconds, (c) SSH reachable when
configured. Returns 503 when any check fails so a proxy/orchestrator
health probe can take the container out of rotation.
"""
from datetime import datetime, timezone
from fastapi.responses import JSONResponse
from app import ssh_client as _ssh
checks: dict[str, dict] = {}
# DB probe — actually exercise the write path (read-only mounts,
# full disks, broken WAL all silently pass a journal_mode read).
# Uses a temp table that lives only inside the connection so the
# round-trip touches the writer without polluting real data.
try:
await db.execute(
"CREATE TEMP TABLE IF NOT EXISTS _hc (k INTEGER PRIMARY KEY, v TEXT)"
)
await db.execute("INSERT OR REPLACE INTO _hc (k, v) VALUES (1, ?)",
(datetime.now(timezone.utc).isoformat(),))
cur = await db.execute("SELECT v FROM _hc WHERE k=1")
row = await cur.fetchone()
await db.commit()
checks["db"] = {"ok": bool(row)}
except Exception as exc:
checks["db"] = {"ok": False, "error": str(exc)}
ps = poller.get_state()
last = ps.get("last_poll_at")
poll_age = None
if last:
try:
t = datetime.fromisoformat(last)
if t.tzinfo is None:
t = t.replace(tzinfo=timezone.utc)
poll_age = (datetime.now(timezone.utc) - t).total_seconds()
except Exception:
poll_age = None
poll_ok = ps.get("healthy") and (
poll_age is None or poll_age <= settings.stale_threshold_seconds * 3
)
checks["poller"] = {
"ok": bool(poll_ok),
"last_error": ps.get("last_error"),
"last_poll_at": last,
"age_seconds": int(poll_age) if poll_age is not None else None,
}
# SSH probe — only when configured. Cheap (single sensors -j).
if _ssh.is_configured():
try:
r = await _ssh.test_connection()
checks["ssh"] = {"ok": bool(r.get("ok")),
"error": r.get("error")}
except Exception as exc:
checks["ssh"] = {"ok": False, "error": str(exc)}
else:
checks["ssh"] = {"ok": True, "skipped": True}
cur = await db.execute("SELECT COUNT(*) FROM drives")
row = await cur.fetchone()
drives_tracked = row[0] if row else 0
status_ok = all(c["ok"] for c in checks.values())
body = {
"status": "ok" if status_ok else "degraded",
"checks": checks,
"drives_tracked": drives_tracked,
"poll_interval_s": settings.poll_interval_seconds,
"version": settings.app_version,
}
return JSONResponse(body, status_code=200 if status_ok else 503)
@router.get("/api/v1/drives", response_model=list[DriveResponse])
async def list_drives(db: aiosqlite.Connection = Depends(get_db)):
cur = await db.execute(_DRIVES_QUERY.format(where=""))
rows = await cur.fetchall()
return [_row_to_drive(r) for r in rows]
@router.get("/api/v1/drives/{drive_id}/drawer")
async def drive_drawer(drive_id: int, db: aiosqlite.Connection = Depends(get_db)):
"""Data for the log drawer — latest burn-in job + stages, SMART tests, audit events."""
cur = await db.execute(_DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
drive = _row_to_drive(row)
# Latest burn-in job + its stages (include log_text and bad_blocks)
cur = await db.execute(
"SELECT * FROM burnin_jobs WHERE drive_id=? ORDER BY id DESC LIMIT 1",
(drive_id,),
)
job_row = await cur.fetchone()
burnin = None
if job_row:
job = dict(job_row)
cur = await db.execute(
"SELECT id, stage_name, state, percent, started_at, finished_at, "
"duration_seconds, error_text, log_text, bad_blocks "
"FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
(job_row["id"],),
)
job["stages"] = [dict(r) for r in await cur.fetchall()]
burnin = job
# SMART raw output from smart_tests table
cur = await db.execute(
"SELECT test_type, state, percent, started_at, finished_at, error_text, raw_output "
"FROM smart_tests WHERE drive_id=?",
(drive_id,),
)
smart_rows = {r["test_type"]: dict(r) for r in await cur.fetchall()}
# Cached SMART attributes (JSON blob on drives table)
import json as _json
smart_attrs = None
cur = await db.execute("SELECT smart_attrs FROM drives WHERE id=?", (drive_id,))
attrs_row = await cur.fetchone()
if attrs_row and attrs_row["smart_attrs"]:
try:
smart_attrs = _json.loads(attrs_row["smart_attrs"])
except Exception:
pass
# Last 50 audit events for this drive (newest first)
cur = await db.execute("""
SELECT id, event_type, operator, message, created_at
FROM audit_events
WHERE drive_id = ?
ORDER BY id DESC
LIMIT 50
""", (drive_id,))
events = [dict(r) for r in await cur.fetchall()]
def _smart_card(test_type: str) -> dict:
smart_obj = drive.smart_short if test_type == "short" else drive.smart_long
base = smart_obj.model_dump() if smart_obj else {}
row = smart_rows.get(test_type, {})
base["raw_output"] = row.get("raw_output")
return base
return {
"drive": {
"id": drive.id,
"devname": drive.devname,
"serial": drive.serial,
"model": drive.model,
"size_bytes": drive.size_bytes,
},
"burnin": burnin,
"smart": {
"short": _smart_card("short"),
"long": _smart_card("long"),
"attrs": smart_attrs,
},
"events": events,
}
@router.get("/api/v1/drives/{drive_id}", response_model=DriveResponse)
async def get_drive(drive_id: int, db: aiosqlite.Connection = Depends(get_db)):
cur = await db.execute(
_DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,)
)
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
return _row_to_drive(row)
@router.post("/api/v1/drives/{drive_id}/smart/start")
async def smart_start(
drive_id: int,
body: dict,
db: aiosqlite.Connection = Depends(get_db),
):
"""Start a standalone SHORT or LONG SMART test on a single drive.
Uses SSH (smartctl) when configured — required for TrueNAS SCALE 25.10+
where the REST smart/test endpoint no longer exists.
Falls back to TrueNAS REST API for older versions.
"""
from app import burnin as _burnin, ssh_client
test_type = (body.get("type") or "").upper()
if test_type not in ("SHORT", "LONG"):
raise HTTPException(status_code=422, detail="type must be SHORT or LONG")
cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
devname = row[0]
now = datetime.now(timezone.utc).isoformat()
ttype_lower = test_type.lower()
if ssh_client.is_configured():
# SSH path — works on TrueNAS SCALE 25.10+ and CORE
try:
output = await ssh_client.start_smart_test(devname, test_type)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"SSH error: {exc}")
# Mark as running in DB (truenas_job_id=NULL signals SSH-managed test)
# Store smartctl start output as proof the test was initiated
await db.execute(
"""INSERT INTO smart_tests (drive_id, test_type, state, percent, started_at, raw_output)
VALUES (?,?,?,?,?,?)
ON CONFLICT(drive_id, test_type) DO UPDATE SET
state='running', percent=0, truenas_job_id=NULL,
started_at=excluded.started_at, finished_at=NULL, error_text=NULL,
raw_output=excluded.raw_output""",
(drive_id, ttype_lower, "running", 0, now, output),
)
await db.commit()
from app import poller as _poller
_poller._notify_subscribers()
return {"devname": devname, "type": test_type, "message": output[:200]}
else:
# REST path — older TrueNAS CORE / SCALE versions
client = _burnin._client
if client is None:
raise HTTPException(status_code=503, detail="TrueNAS client not ready")
try:
tn_job_id = await client.start_smart_test([devname], test_type)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}")
return {"job_id": tn_job_id, "devname": devname, "type": test_type}
@router.post("/api/v1/drives/{drive_id}/smart/cancel")
async def smart_cancel(
drive_id: int,
body: dict,
db: aiosqlite.Connection = Depends(get_db),
):
"""Cancel a running standalone SMART test on a drive."""
from app import burnin as _burnin
test_type = (body.get("type") or "").lower()
if test_type not in ("short", "long"):
raise HTTPException(status_code=422, detail="type must be 'short' or 'long'")
cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Drive not found")
devname = row[0]
client = _burnin._client
if client is None:
raise HTTPException(status_code=503, detail="TrueNAS client not ready")
from app import ssh_client
if ssh_client.is_configured():
# SSH path — abort via smartctl -X
try:
await ssh_client.abort_smart_test(devname)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"SSH abort error: {exc}")
else:
# REST path — find TrueNAS job and abort it
try:
jobs = await client.get_smart_jobs()
tn_job_id = None
for j in jobs:
if j.get("state") != "RUNNING":
continue
args = j.get("arguments", [])
if not args or not isinstance(args[0], dict):
continue
if devname in args[0].get("disks", []):
tn_job_id = j["id"]
break
if tn_job_id is None:
raise HTTPException(status_code=404, detail="No running SMART test found for this drive")
await client.abort_job(tn_job_id)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}")
# Update local DB state
now = datetime.now(timezone.utc).isoformat()
await db.execute(
"UPDATE smart_tests SET state='aborted', finished_at=? WHERE drive_id=? AND test_type=? AND state='running'",
(now, drive_id, test_type),
)
await db.commit()
return {"cancelled": True, "devname": devname, "type": test_type}
# ---------------------------------------------------------------------------
# Burn-in API
# ---------------------------------------------------------------------------
def _row_to_burnin(row: aiosqlite.Row, stages: list[aiosqlite.Row]) -> BurninJobResponse:
return BurninJobResponse(
id=row["id"],
drive_id=row["drive_id"],
profile=row["profile"],
state=row["state"],
percent=row["percent"] or 0,
stage_name=row["stage_name"],
operator=row["operator"],
created_at=row["created_at"],
started_at=row["started_at"],
finished_at=row["finished_at"],
error_text=row["error_text"],
stages=[
BurninStageResponse(
id=s["id"],
stage_name=s["stage_name"],
state=s["state"],
percent=s["percent"] or 0,
started_at=s["started_at"],
finished_at=s["finished_at"],
error_text=s["error_text"],
)
for s in stages
],
)
def _operator_for(request: Request, _ignored_body_value: str | None = None) -> str:
"""Always return the logged-in user's name for audit attribution.
The request body's `operator` field (if any) is ignored — clients
can't spoof the operator identity any more."""
user = getattr(request.state, "current_user", None)
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
return user.full_name or user.username
@router.post("/api/v1/burnin/start")
async def burnin_start(request: Request, req: StartBurninRequest):
operator = _operator_for(request, req.operator)
results = []
errors = []
for drive_id in req.drive_ids:
try:
job_id = await burnin.start_job(
drive_id, req.profile, operator, stage_order=req.stage_order
)
results.append({"drive_id": drive_id, "job_id": job_id})
except burnin.PoolMemberError as exc:
errors.append({
"drive_id": drive_id,
"error": str(exc),
"pool_name": exc.pool_name,
"pool_role": exc.pool_role,
"pool_locked": True,
})
except ValueError as exc:
errors.append({"drive_id": drive_id, "error": str(exc)})
if errors and not results:
# Surface the first error's structured fields so the UI can render
# an unlock affordance instead of a generic toast.
raise HTTPException(status_code=409, detail=errors[0])
return {"queued": results, "errors": errors}
@router.post("/api/v1/drives/{drive_id}/unlock")
async def unlock_pool_drive(drive_id: int, request: Request, req: UnlockPoolDriveRequest):
operator = _operator_for(request, req.operator)
try:
expiry = await burnin.grant_pool_unlock(
drive_id, req.confirm_token, operator, req.reason,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
return {"unlocked": True, "expires_at": expiry,
# Read from the submodule, not the package-root snapshot
# alias — keeps tests that monkey-patch UNLOCK_TTL_SECONDS
# in app.burnin.unlock observable from the API response.
"ttl_seconds": burnin.unlock.UNLOCK_TTL_SECONDS}
@router.post("/api/v1/burnin/{job_id}/cancel")
async def burnin_cancel(job_id: int, request: Request, req: CancelBurninRequest):
operator = _operator_for(request, req.operator)
ok = await burnin.cancel_job(job_id, operator)
if not ok:
raise HTTPException(status_code=409, detail="Job not found or not cancellable")
return {"cancelled": True}
# ---------------------------------------------------------------------------
# History pages
# ---------------------------------------------------------------------------
_PAGE_SIZE = 50
_ALL_STATES = ("queued", "running", "passed", "failed", "cancelled", "unknown")
_HISTORY_QUERY = """
SELECT
bj.id, bj.drive_id, bj.profile, bj.state, bj.operator,
bj.created_at, bj.started_at, bj.finished_at, bj.error_text,
d.devname, d.serial, d.model, d.size_bytes,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
{where}
ORDER BY bj.id DESC
"""
def _state_where(state: str) -> tuple[str, list]:
if state == "all":
return "", []
return "WHERE bj.state = ?", [state]
@router.get("/history", response_class=HTMLResponse)
async def history_list(
request: Request,
state: str = Query(default="all"),
page: int = Query(default=1, ge=1),
db: aiosqlite.Connection = Depends(get_db),
):
if state not in ("all",) + _ALL_STATES:
state = "all"
where_clause, params = _state_where(state)
# Total count
count_sql = f"SELECT COUNT(*) FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id {where_clause}"
cur = await db.execute(count_sql, params)
total_count = (await cur.fetchone())[0]
total_pages = max(1, (total_count + _PAGE_SIZE - 1) // _PAGE_SIZE)
page = min(page, total_pages)
offset = (page - 1) * _PAGE_SIZE
# Per-state counts for badges
cur = await db.execute(
"SELECT state, COUNT(*) FROM burnin_jobs GROUP BY state"
)
counts = {"all": total_count if state == "all" else 0}
for r in await cur.fetchall():
counts[r[0]] = r[1]
if state != "all":
cur2 = await db.execute("SELECT COUNT(*) FROM burnin_jobs")
counts["all"] = (await cur2.fetchone())[0]
# Job rows
sql = _HISTORY_QUERY.format(where=where_clause) + " LIMIT ? OFFSET ?"
cur = await db.execute(sql, params + [_PAGE_SIZE, offset])
rows = await cur.fetchall()
jobs = [dict(r) for r in rows]
ps = poller.get_state()
return templates.TemplateResponse(request, "history.html", {
"request": request,
"jobs": jobs,
"active_state": state,
"counts": counts,
"page": page,
"total_pages": total_pages,
"total_count": total_count,
"poller": ps,
**_stale_context(ps),
})
@router.get("/history/{job_id}", response_class=HTMLResponse)
async def history_detail(
request: Request,
job_id: int,
db: aiosqlite.Connection = Depends(get_db),
):
# Job + drive info
cur = await db.execute("""
SELECT
bj.*, d.devname, d.serial, d.model, d.size_bytes,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id = ?
""", (job_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Burn-in job not found")
job = dict(row)
# Stages (with duration)
cur = await db.execute("""
SELECT *,
CAST(
(julianday(finished_at) - julianday(started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_stages
WHERE burnin_job_id = ?
ORDER BY id
""", (job_id,))
job["stages"] = [dict(r) for r in await cur.fetchall()]
ps = poller.get_state()
return templates.TemplateResponse(request, "job_detail.html", {
"request": request,
"job": job,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# CSV export
# ---------------------------------------------------------------------------
@router.get("/api/v1/burnin/export.csv")
async def burnin_export_csv(db: aiosqlite.Connection = Depends(get_db)):
cur = await db.execute("""
SELECT
bj.id AS job_id,
bj.drive_id,
d.devname,
d.serial,
d.model,
bj.profile,
bj.state,
bj.operator,
bj.created_at,
bj.started_at,
bj.finished_at,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds,
bj.error_text
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
ORDER BY bj.id DESC
""")
rows = await cur.fetchall()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow([
"job_id", "drive_id", "devname", "serial", "model",
"profile", "state", "operator",
"created_at", "started_at", "finished_at", "duration_seconds",
"error_text",
])
for r in rows:
writer.writerow(list(r))
buf.seek(0)
return StreamingResponse(
iter([buf.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=burnin_history.csv"},
)
# ---------------------------------------------------------------------------
# On-demand email report
# ---------------------------------------------------------------------------
@router.post("/api/v1/report/send")
async def send_report_now():
"""Trigger the daily status email immediately (for testing SMTP config)."""
if not settings.smtp_host:
raise HTTPException(status_code=503, detail="SMTP not configured (SMTP_HOST is empty)")
try:
await mailer.send_report_now()
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Mail send failed: {exc}")
return {"sent": True, "to": settings.smtp_to}
# ---------------------------------------------------------------------------
# Drive notes / location update
# ---------------------------------------------------------------------------
@router.patch("/api/v1/drives/{drive_id}")
async def update_drive(
drive_id: int,
req: UpdateDriveRequest,
db: aiosqlite.Connection = Depends(get_db),
):
cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,))
if not await cur.fetchone():
raise HTTPException(status_code=404, detail="Drive not found")
await db.execute(
"UPDATE drives SET notes=?, location=? WHERE id=?",
(req.notes, req.location, drive_id),
)
await db.commit()
return {"updated": True}
@router.post("/api/v1/drives/{drive_id}/reset")
async def reset_drive(
drive_id: int,
request: Request,
body: dict,
db: aiosqlite.Connection = Depends(get_db),
):
"""
Clear SMART test results for a drive so it shows as fresh.
Only allowed when no burn-in job is active (queued or running).
Preserves all job history — just resets the display state.
"""
cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,))
if not await cur.fetchone():
raise HTTPException(status_code=404, detail="Drive not found")
# Reject if any active burn-in
cur = await db.execute(
"SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')",
(drive_id,),
)
if (await cur.fetchone())[0] > 0:
raise HTTPException(status_code=409, detail="Cannot reset while a burn-in is active")
# Trust the logged-in user, not the body (the JS used to send a
# literal "operator" because window._operator was never set).
operator = _operator_for(request, body.get("operator"))
# Reset SMART test state to idle
await db.execute(
"""UPDATE smart_tests SET state='idle', percent=0, started_at=NULL,
eta_at=NULL, finished_at=NULL, error_text=NULL, raw_output=NULL
WHERE drive_id=?""",
(drive_id,),
)
# Clear SMART attrs cache + stamp reset time (hides prior burn-in from dashboard)
now = datetime.now(timezone.utc).isoformat()
await db.execute(
"UPDATE drives SET smart_attrs=NULL, last_reset_at=? WHERE id=?",
(now, drive_id),
)
# Audit event
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, operator, message)
VALUES (?,?,?,?)""",
("drive_reset", drive_id, operator, "Drive reset — SMART state cleared"),
)
await db.commit()
poller._notify_subscribers()
return {"reset": True}
# ---------------------------------------------------------------------------
# Audit log page
# ---------------------------------------------------------------------------
_AUDIT_QUERY = """
SELECT
ae.id, ae.event_type, ae.operator, ae.message, ae.created_at,
d.devname, d.serial
FROM audit_events ae
LEFT JOIN drives d ON d.id = ae.drive_id
ORDER BY ae.id DESC
LIMIT 200
"""
_AUDIT_EVENT_COLORS = {
"burnin_queued": "yellow",
"burnin_started": "blue",
"burnin_passed": "passed",
"burnin_failed": "failed",
"burnin_cancelled": "cancelled",
"burnin_stuck": "failed",
"burnin_unknown": "unknown",
}
@router.get("/audit", response_class=HTMLResponse)
async def audit_log(
request: Request,
db: aiosqlite.Connection = Depends(get_db),
):
cur = await db.execute(_AUDIT_QUERY)
rows = [dict(r) for r in await cur.fetchall()]
ps = poller.get_state()
return templates.TemplateResponse(request, "audit.html", {
"request": request,
"events": rows,
"event_colors": _AUDIT_EVENT_COLORS,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# Stats / analytics page
# ---------------------------------------------------------------------------
@router.get("/stats", response_class=HTMLResponse)
async def stats_page(
request: Request,
db: aiosqlite.Connection = Depends(get_db),
):
# Overall counts
cur = await db.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN state='passed' THEN 1 ELSE 0 END) as passed,
SUM(CASE WHEN state='failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN state='running' THEN 1 ELSE 0 END) as running,
SUM(CASE WHEN state='cancelled' THEN 1 ELSE 0 END) as cancelled
FROM burnin_jobs
""")
overall = dict(await cur.fetchone())
# Failure rate by drive model (only completed jobs)
cur = await db.execute("""
SELECT
COALESCE(d.model, 'Unknown') AS model,
COUNT(*) AS total,
SUM(CASE WHEN bj.state='passed' THEN 1 ELSE 0 END) AS passed,
SUM(CASE WHEN bj.state='failed' THEN 1 ELSE 0 END) AS failed,
ROUND(100.0 * SUM(CASE WHEN bj.state='passed' THEN 1 ELSE 0 END) / COUNT(*), 1) AS pass_rate
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.state IN ('passed', 'failed')
GROUP BY COALESCE(d.model, 'Unknown')
ORDER BY total DESC
LIMIT 20
""")
by_model = [dict(r) for r in await cur.fetchall()]
# Activity last 14 days
cur = await db.execute("""
SELECT
date(created_at) AS day,
COUNT(*) AS total,
SUM(CASE WHEN state='passed' THEN 1 ELSE 0 END) AS passed,
SUM(CASE WHEN state='failed' THEN 1 ELSE 0 END) AS failed
FROM burnin_jobs
WHERE created_at >= date('now', '-14 days')
GROUP BY date(created_at)
ORDER BY day DESC
""")
by_day = [dict(r) for r in await cur.fetchall()]
# Average test duration by drive size (rounded to nearest TB)
cur = await db.execute("""
SELECT
CAST(ROUND(CAST(d.size_bytes AS REAL) / 1e12) AS INTEGER) AS size_tb,
COUNT(*) AS total,
ROUND(AVG(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 / 3600.0
), 1) AS avg_hours
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.state IN ('passed', 'failed')
AND bj.started_at IS NOT NULL
AND bj.finished_at IS NOT NULL
GROUP BY size_tb
ORDER BY size_tb
""")
by_size = [dict(r) for r in await cur.fetchall()]
# Failure breakdown by stage (which stage caused the failure)
cur = await db.execute("""
SELECT
COALESCE(bj.stage_name, 'unknown') AS failed_stage,
COUNT(*) AS count
FROM burnin_jobs bj
WHERE bj.state = 'failed'
GROUP BY failed_stage
ORDER BY count DESC
""")
by_failure_stage = [dict(r) for r in await cur.fetchall()]
# Drives tracked
cur = await db.execute("SELECT COUNT(*) FROM drives")
drives_total = (await cur.fetchone())[0]
ps = poller.get_state()
return templates.TemplateResponse(request, "stats.html", {
"request": request,
"overall": overall,
"by_model": by_model,
"by_day": by_day,
"by_size": by_size,
"by_failure_stage": by_failure_stage,
"drives_total": drives_total,
"poller": ps,
**_stale_context(ps),
})
# ---------------------------------------------------------------------------
# Settings page
# ---------------------------------------------------------------------------
@router.get("/settings", response_class=HTMLResponse)
async def settings_page(
request: Request,
db: aiosqlite.Connection = Depends(get_db),
):
auth.require_admin(request)
# Editable values — real values for form fields (secrets excluded)
editable = {
# SMTP
"smtp_host": settings.smtp_host,
"smtp_port": settings.smtp_port,
"smtp_ssl_mode": settings.smtp_ssl_mode or "starttls",
"smtp_timeout": settings.smtp_timeout,
"smtp_user": settings.smtp_user,
"smtp_from": settings.smtp_from,
"smtp_to": settings.smtp_to,
"smtp_report_hour": settings.smtp_report_hour,
"smtp_daily_report_enabled": settings.smtp_daily_report_enabled,
"smtp_alert_on_fail": settings.smtp_alert_on_fail,
"smtp_alert_on_pass": settings.smtp_alert_on_pass,
# Webhook
"webhook_url": settings.webhook_url,
# Burn-in behaviour
"stuck_job_hours": settings.stuck_job_hours,
"max_parallel_burnins": settings.max_parallel_burnins,
"temp_warn_c": settings.temp_warn_c,
"temp_crit_c": settings.temp_crit_c,
"bad_block_threshold": settings.bad_block_threshold,
"surface_validate_block_size": settings.surface_validate_block_size,
"surface_validate_block_buffer": settings.surface_validate_block_buffer,
"surface_validate_passes": settings.surface_validate_passes,
# SSH credentials (take effect immediately — each SSH call reads live settings)
"ssh_host": settings.ssh_host,
"ssh_port": settings.ssh_port,
"ssh_user": settings.ssh_user,
# Note: ssh_password and ssh_key intentionally omitted from display (sensitive)
# System settings (restart required to fully apply)
"truenas_base_url": settings.truenas_base_url,
"truenas_verify_tls": settings.truenas_verify_tls,
"poll_interval_seconds": settings.poll_interval_seconds,
"stale_threshold_seconds": settings.stale_threshold_seconds,
"allowed_ips": settings.allowed_ips,
"log_level": settings.log_level,
# Note: truenas_api_key intentionally omitted from display (sensitive)
}
from app import ssh_client as _ssh
ps = poller.get_state()
return templates.TemplateResponse(request, "settings.html", {
"request": request,
"editable": editable,
"secret_status": _secret_status(),
"smtp_enabled": bool(settings.smtp_host),
"ssh_configured": _ssh.is_configured(),
"app_version": settings.app_version,
"poller": ps,
**_stale_context(ps),
})
# Field names that hold secrets and must never be rendered to the UI or
# included in the redacted-settings dump verbatim.
_SECRET_FIELDS = ("smtp_password", "ssh_password", "ssh_key", "truenas_api_key")
def _secret_status() -> dict[str, str]:
"""Per-secret display string for the settings page so the operator can
see whether each secret is configured (and how) without ever rendering
the value. Distinguishes env-var, mounted-file, and DB-stored sources
for ssh_key — the others can only come from the live settings object."""
import os as _os
from app.ssh_client import _MOUNTED_KEY_PATH
def _has(field: str) -> bool:
v = getattr(settings, field, "")
return bool(v)
# ssh_key gets the most granular treatment because we actively prefer
# the mounted file path in production but the textarea is still wired.
if _os.environ.get("SSH_KEY"):
ssh_key_status = "set (environment variable)"
elif _has("ssh_key"):
ssh_key_status = "set (stored in settings DB — prefer a mounted secret in production)"
elif _os.path.exists(
_os.environ.get("SSH_KEY_FILE", _MOUNTED_KEY_PATH)
):
ssh_key_status = "set (mounted secret)"
else:
ssh_key_status = "unset"
return {
"smtp_password": "set" if _has("smtp_password") else "unset",
"ssh_password": "set" if _has("ssh_password") else "unset",
"ssh_key": ssh_key_status,
"truenas_api_key": "set" if _has("truenas_api_key") else "unset",
}
@router.get("/api/v1/settings/redacted")
async def get_settings_redacted(request: Request):
"""Admin-only diagnostic dump of every editable setting with secrets
replaced by '***'. Useful for ops triage ("what's actually loaded
right now?") without leaking the real values into the transcript."""
user = request.state.current_user
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
if not user.is_admin:
raise HTTPException(status_code=403, detail="Admin only")
out: dict[str, object] = {}
for field in settings_store._EDITABLE.keys():
val = getattr(settings, field, None)
if field in _SECRET_FIELDS:
out[field] = "***" if val else None
else:
out[field] = val
out["_secret_status"] = _secret_status()
return out
@router.post("/api/v1/settings")
async def save_settings(request: Request, body: dict):
"""Save editable runtime settings. Secrets are only updated if non-empty."""
user = auth.require_admin(request)
# Don't overwrite secrets if client sent empty string. Track which
# ones DID get a real change so we can audit the rotation.
rotated: list[str] = []
for secret_field in _SECRET_FIELDS:
if secret_field in body:
if body[secret_field] == "":
del body[secret_field]
else:
rotated.append(secret_field)
try:
saved = settings_store.save(body)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc))
# Audit secret rotations — never log the value, only the field name +
# operator + source IP. Lets `audit` page answer "who rotated the
# SMTP password last week?"
if rotated and user:
await auth.audit_auth_event(
"settings_secret_changed",
user.username,
f"Rotated secrets from {_client_ip(request)}: {', '.join(sorted(rotated))}",
)
return {"saved": True, "keys": saved, "rotated_secrets": rotated}
@router.post("/api/v1/settings/test-smtp")
async def test_smtp(request: Request):
"""Test the current SMTP configuration without sending an email."""
auth.require_admin(request)
result = await mailer.test_smtp_connection()
if not result["ok"]:
raise HTTPException(status_code=502, detail=result["error"])
return {"ok": True}
@router.post("/api/v1/settings/test-ssh")
async def test_ssh(request: Request):
"""Test the current SSH configuration."""
auth.require_admin(request)
from app import ssh_client
result = await ssh_client.test_connection()
if not result["ok"]:
raise HTTPException(status_code=502, detail=result.get("error", "Connection failed"))
return {"ok": True}
@router.websocket("/ws/terminal")
async def terminal_ws(websocket: WebSocket):
"""WebSocket endpoint bridging the browser xterm.js terminal to an SSH PTY."""
from app import terminal as _term
await _term.handle(websocket)
@router.get("/api/v1/updates/check")
async def check_updates():
"""Check for a newer release on Forgejo."""
import httpx
current = settings.app_version
try:
async with httpx.AsyncClient(timeout=8.0) as client:
r = await client.get(
"https://git.hellocomputer.xyz/api/v1/repos/brandon/truenas-burnin/releases/latest",
headers={"Accept": "application/json"},
)
if r.status_code == 200:
data = r.json()
latest = data.get("tag_name", "").lstrip("v")
up_to_date = not latest or latest == current
return {
"current": current,
"latest": latest or None,
"update_available": not up_to_date,
"message": None,
}
elif r.status_code == 404:
return {"current": current, "latest": None, "update_available": False,
"message": "No releases published yet"}
else:
return {"current": current, "latest": None, "update_available": False,
"message": f"Forgejo API returned {r.status_code}"}
except Exception as exc:
return {"current": current, "latest": None, "update_available": False,
"message": f"Could not reach update server: {exc}"}
# ---------------------------------------------------------------------------
# Print view (must be BEFORE /{job_id} int route)
# ---------------------------------------------------------------------------
@router.get("/history/{job_id}/print", response_class=HTMLResponse)
async def history_print(
request: Request,
job_id: int,
db: aiosqlite.Connection = Depends(get_db),
):
cur = await db.execute("""
SELECT
bj.*, d.devname, d.serial, d.model, d.size_bytes,
CAST(
(julianday(bj.finished_at) - julianday(bj.started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id = ?
""", (job_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Job not found")
job = dict(row)
cur = await db.execute("""
SELECT *,
CAST(
(julianday(finished_at) - julianday(started_at)) * 86400
AS INTEGER
) AS duration_seconds
FROM burnin_stages WHERE burnin_job_id=? ORDER BY id
""", (job_id,))
job["stages"] = [dict(r) for r in await cur.fetchall()]
return templates.TemplateResponse(request, "job_print.html", {
"request": request,
"job": job,
})
# ---------------------------------------------------------------------------
# Burn-in job detail API (must be after export.csv to avoid int coercion)
# ---------------------------------------------------------------------------
@router.get("/api/v1/burnin/{job_id}", response_model=BurninJobResponse)
async def burnin_get(job_id: int, db: aiosqlite.Connection = Depends(get_db)):
db.row_factory = aiosqlite.Row
cur = await db.execute("SELECT * FROM burnin_jobs WHERE id=?", (job_id,))
row = await cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Burn-in job not found")
cur = await db.execute(
"SELECT * FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", (job_id,)
)
stages = await cur.fetchall()
return _row_to_burnin(row, stages)