truenas-burnin/app/burnin.py
Brandon Walter b73b5251ae Initial commit — TrueNAS Burn-In Dashboard v0.5.0
Full-stack burn-in orchestration dashboard (Stages 1–6d complete):
FastAPI backend, SQLite/WAL, SSE live dashboard, mock TrueNAS server,
SMTP/webhook notifications, batch burn-in, settings UI, audit log,
stats page, cancel SMART/burn-in, drag-to-reorder stages.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 00:08:29 -05:00

658 lines
24 KiB
Python

"""
Burn-in orchestrator.
Manages a FIFO queue of burn-in jobs capped at MAX_PARALLEL_BURNINS concurrent
executions. Each job runs stages sequentially; a failed stage aborts the job.
State is persisted to SQLite throughout — DB is source of truth.
On startup:
- Any 'running' jobs from a previous run are marked 'unknown' (interrupted).
- Any 'queued' jobs are re-enqueued automatically.
Cancellation:
- cancel_job() sets DB state to 'cancelled'.
- Running stage coroutines check _is_cancelled() at POLL_INTERVAL boundaries
and abort within a few seconds of the cancel request.
"""
import asyncio
import logging
import time
from datetime import datetime, timezone
import aiosqlite
from app.config import settings
from app.truenas import TrueNASClient
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Stage definitions
# ---------------------------------------------------------------------------
STAGE_ORDER: dict[str, list[str]] = {
# Legacy
"quick": ["precheck", "short_smart", "io_validate", "final_check"],
# Single-stage selectable profiles
"surface": ["precheck", "surface_validate", "final_check"],
"short": ["precheck", "short_smart", "final_check"],
"long": ["precheck", "long_smart", "final_check"],
# Two-stage combos
"surface_short": ["precheck", "surface_validate", "short_smart", "final_check"],
"surface_long": ["precheck", "surface_validate", "long_smart", "final_check"],
"short_long": ["precheck", "short_smart", "long_smart", "final_check"],
# All three
"full": ["precheck", "surface_validate", "short_smart", "long_smart", "final_check"],
}
# Per-stage base weights used to compute overall job % progress dynamically
_STAGE_BASE_WEIGHTS: dict[str, int] = {
"precheck": 5,
"surface_validate": 65,
"short_smart": 12,
"long_smart": 13,
"io_validate": 10,
"final_check": 5,
}
POLL_INTERVAL = 5.0 # seconds between progress checks during active stages
# ---------------------------------------------------------------------------
# Module-level state (initialized in init())
# ---------------------------------------------------------------------------
_semaphore: asyncio.Semaphore | None = None
_client: TrueNASClient | None = None
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _db():
"""Open a fresh WAL-mode connection. Caller must use 'async with'."""
return aiosqlite.connect(settings.db_path)
# ---------------------------------------------------------------------------
# Init + startup reconciliation
# ---------------------------------------------------------------------------
async def init(client: TrueNASClient) -> None:
global _semaphore, _client
_semaphore = asyncio.Semaphore(settings.max_parallel_burnins)
_client = client
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
# Mark interrupted running jobs as unknown
await db.execute(
"UPDATE burnin_jobs SET state='unknown', finished_at=? WHERE state='running'",
(_now(),),
)
# Re-enqueue previously queued jobs
cur = await db.execute(
"SELECT id FROM burnin_jobs WHERE state='queued' ORDER BY created_at"
)
queued = [r["id"] for r in await cur.fetchall()]
await db.commit()
for job_id in queued:
asyncio.create_task(_run_job(job_id))
log.info("Burn-in orchestrator ready (max_concurrent=%d)", settings.max_parallel_burnins)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def start_job(drive_id: int, profile: str, operator: str,
stage_order: list[str] | None = None) -> int:
"""Create and enqueue a burn-in job. Returns the new job ID.
If stage_order is provided (e.g. ["short_smart","long_smart","surface_validate"]),
the job runs those stages in that order (precheck and final_check are always prepended/appended).
Otherwise the preset STAGE_ORDER[profile] is used.
"""
now = _now()
# Build the actual stage list
if stage_order is not None:
stages = ["precheck"] + list(stage_order) + ["final_check"]
else:
stages = STAGE_ORDER[profile]
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
# Reject duplicate active burn-in for same drive
cur = await db.execute(
"SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')",
(drive_id,),
)
if (await cur.fetchone())[0] > 0:
raise ValueError("Drive already has an active burn-in job")
# Create job
cur = await db.execute(
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
VALUES (?,?,?,?,?,?) RETURNING id""",
(drive_id, profile, "queued", 0, operator, now),
)
job_id = (await cur.fetchone())["id"]
# Create stage rows in the desired execution order
for stage_name in stages:
await db.execute(
"INSERT INTO burnin_stages (burnin_job_id, stage_name, state) VALUES (?,?,?)",
(job_id, stage_name, "pending"),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
("burnin_queued", drive_id, job_id, operator, f"Queued {profile} burn-in"),
)
await db.commit()
asyncio.create_task(_run_job(job_id))
log.info("Burn-in job %d queued (drive_id=%d profile=%s operator=%s)",
job_id, drive_id, profile, operator)
return job_id
async def cancel_job(job_id: int, operator: str) -> bool:
"""Cancel a queued or running job. Returns True if state was changed."""
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
cur = await db.execute(
"SELECT state, drive_id FROM burnin_jobs WHERE id=?", (job_id,)
)
row = await cur.fetchone()
if not row or row["state"] not in ("queued", "running"):
return False
await db.execute(
"UPDATE burnin_jobs SET state='cancelled', finished_at=? WHERE id=?",
(_now(), job_id),
)
await db.execute(
"UPDATE burnin_stages SET state='cancelled' WHERE burnin_job_id=? AND state IN ('pending','running')",
(job_id,),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
("burnin_cancelled", row["drive_id"], job_id, operator, "Cancelled by operator"),
)
await db.commit()
log.info("Burn-in job %d cancelled by %s", job_id, operator)
return True
# ---------------------------------------------------------------------------
# Job runner
# ---------------------------------------------------------------------------
async def _run_job(job_id: int) -> None:
"""Acquire semaphore slot, execute all stages, persist final state."""
assert _semaphore is not None, "burnin.init() not called"
async with _semaphore:
if await _is_cancelled(job_id):
return
# Transition queued → running
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
row = await (await db.execute(
"SELECT drive_id, profile FROM burnin_jobs WHERE id=?", (job_id,)
)).fetchone()
if not row:
return
drive_id, profile = row[0], row[1]
cur = await db.execute("SELECT devname, serial, model FROM drives WHERE id=?", (drive_id,))
devname_row = await cur.fetchone()
if not devname_row:
return
devname = devname_row[0]
drive_serial = devname_row[1]
drive_model = devname_row[2]
await db.execute(
"UPDATE burnin_jobs SET state='running', started_at=? WHERE id=?",
(_now(), job_id),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,(SELECT operator FROM burnin_jobs WHERE id=?),?)""",
("burnin_started", drive_id, job_id, job_id, f"Started {profile} burn-in on {devname}"),
)
# Read stage order from DB (respects any custom order set at job creation)
stage_cur = await db.execute(
"SELECT stage_name FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
(job_id,),
)
job_stages = [r[0] for r in await stage_cur.fetchall()]
await db.commit()
_push_update()
log.info("Burn-in started", extra={"job_id": job_id, "devname": devname, "profile": profile})
success = False
error_text = None
try:
success = await _execute_stages(job_id, job_stages, devname, drive_id)
except asyncio.CancelledError:
pass
except Exception as exc:
error_text = str(exc)
log.exception("Burn-in raised exception", extra={"job_id": job_id, "devname": devname})
if await _is_cancelled(job_id):
return
final_state = "passed" if success else "failed"
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.execute(
"UPDATE burnin_jobs SET state=?, percent=?, finished_at=?, error_text=? WHERE id=?",
(final_state, 100 if success else None, _now(), error_text, job_id),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,(SELECT operator FROM burnin_jobs WHERE id=?),?)""",
(f"burnin_{final_state}", drive_id, job_id, job_id,
f"Burn-in {final_state} on {devname}"),
)
await db.commit()
# Build SSE alert for browser notifications
alert = {
"state": final_state,
"job_id": job_id,
"devname": devname,
"serial": drive_serial,
"model": drive_model,
"error_text": error_text,
}
_push_update(alert=alert)
log.info("Burn-in finished", extra={"job_id": job_id, "devname": devname, "state": final_state})
# Fire webhook + immediate email in background (non-blocking)
try:
from app import notifier
cur2 = None
async with _db() as db2:
db2.row_factory = aiosqlite.Row
cur2 = await db2.execute(
"SELECT profile, operator FROM burnin_jobs WHERE id=?", (job_id,)
)
job_row = await cur2.fetchone()
if job_row:
asyncio.create_task(notifier.notify_job_complete(
job_id=job_id,
devname=devname,
serial=drive_serial,
model=drive_model,
state=final_state,
profile=job_row["profile"],
operator=job_row["operator"],
error_text=error_text,
))
except Exception as exc:
log.error("Failed to schedule notifications: %s", exc)
async def _execute_stages(job_id: int, stages: list[str], devname: str, drive_id: int) -> bool:
for stage_name in stages:
if await _is_cancelled(job_id):
return False
await _start_stage(job_id, stage_name)
_push_update()
try:
ok = await _dispatch_stage(job_id, stage_name, devname, drive_id)
except Exception as exc:
log.error("Stage raised exception: %s", exc, extra={"job_id": job_id, "devname": devname, "stage": stage_name})
ok = False
await _finish_stage(job_id, stage_name, success=False, error_text=str(exc))
_push_update()
return False
if not ok and await _is_cancelled(job_id):
# Stage was aborted due to cancellation — mark it cancelled, not failed
await _cancel_stage(job_id, stage_name)
else:
await _finish_stage(job_id, stage_name, success=ok)
await _recalculate_progress(job_id, profile)
_push_update()
if not ok:
return False
return True
async def _dispatch_stage(job_id: int, stage_name: str, devname: str, drive_id: int) -> bool:
if stage_name == "precheck":
return await _stage_precheck(job_id, drive_id)
elif stage_name == "short_smart":
return await _stage_smart_test(job_id, devname, "SHORT", "short_smart")
elif stage_name == "long_smart":
return await _stage_smart_test(job_id, devname, "LONG", "long_smart")
elif stage_name == "surface_validate":
return await _stage_timed_simulate(job_id, "surface_validate", settings.surface_validate_seconds)
elif stage_name == "io_validate":
return await _stage_timed_simulate(job_id, "io_validate", settings.io_validate_seconds)
elif stage_name == "final_check":
return await _stage_final_check(job_id, devname)
return True
# ---------------------------------------------------------------------------
# Individual stage implementations
# ---------------------------------------------------------------------------
async def _stage_precheck(job_id: int, drive_id: int) -> bool:
"""Check SMART health and temperature before starting destructive work."""
async with _db() as db:
cur = await db.execute(
"SELECT smart_health, temperature_c FROM drives WHERE id=?", (drive_id,)
)
row = await cur.fetchone()
if not row:
return False
health, temp = row[0], row[1]
if health == "FAILED":
await _set_stage_error(job_id, "precheck", "Drive SMART health is FAILED — refusing to burn in")
return False
if temp and temp > 60:
await _set_stage_error(job_id, "precheck", f"Drive temperature {temp}°C exceeds 60°C limit")
return False
await asyncio.sleep(1) # Simulate brief check
return True
async def _stage_smart_test(job_id: int, devname: str, test_type: str, stage_name: str) -> bool:
"""Start a TrueNAS SMART test and poll until complete."""
tn_job_id = await _client.start_smart_test([devname], test_type)
while True:
if await _is_cancelled(job_id):
try:
await _client.abort_job(tn_job_id)
except Exception:
pass
return False
jobs = await _client.get_smart_jobs()
job = next((j for j in jobs if j["id"] == tn_job_id), None)
if not job:
return False
state = job["state"]
pct = job["progress"]["percent"]
await _update_stage_percent(job_id, stage_name, pct)
await _recalculate_progress(job_id, None)
_push_update()
if state == "SUCCESS":
return True
elif state in ("FAILED", "ABORTED"):
await _set_stage_error(job_id, stage_name,
job.get("error") or f"SMART {test_type} test failed")
return False
await asyncio.sleep(POLL_INTERVAL)
async def _stage_timed_simulate(job_id: int, stage_name: str, duration_seconds: int) -> bool:
"""Simulate a timed stage (surface validation / IO validation) with progress updates."""
start = time.monotonic()
while True:
if await _is_cancelled(job_id):
return False
elapsed = time.monotonic() - start
pct = min(100, int(elapsed / duration_seconds * 100))
await _update_stage_percent(job_id, stage_name, pct)
await _recalculate_progress(job_id, None)
_push_update()
if pct >= 100:
return True
await asyncio.sleep(POLL_INTERVAL)
async def _stage_final_check(job_id: int, devname: str) -> bool:
"""Verify drive passed all tests by checking current SMART health in DB."""
await asyncio.sleep(1)
async with _db() as db:
cur = await db.execute(
"SELECT smart_health FROM drives WHERE devname=?", (devname,)
)
row = await cur.fetchone()
if not row or row[0] == "FAILED":
await _set_stage_error(job_id, "final_check", "Drive SMART health is FAILED after burn-in")
return False
return True
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
async def _is_cancelled(job_id: int) -> bool:
async with _db() as db:
cur = await db.execute("SELECT state FROM burnin_jobs WHERE id=?", (job_id,))
row = await cur.fetchone()
return bool(row and row[0] == "cancelled")
async def _start_stage(job_id: int, stage_name: str) -> None:
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.execute(
"UPDATE burnin_stages SET state='running', started_at=? WHERE burnin_job_id=? AND stage_name=?",
(_now(), job_id, stage_name),
)
await db.execute(
"UPDATE burnin_jobs SET stage_name=? WHERE id=?",
(stage_name, job_id),
)
await db.commit()
async def _finish_stage(job_id: int, stage_name: str, success: bool, error_text: str | None = None) -> None:
now = _now()
state = "passed" if success else "failed"
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
cur = await db.execute(
"SELECT started_at FROM burnin_stages WHERE burnin_job_id=? AND stage_name=?",
(job_id, stage_name),
)
row = await cur.fetchone()
duration = None
if row and row[0]:
try:
start = datetime.fromisoformat(row[0])
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
duration = (datetime.now(timezone.utc) - start).total_seconds()
except Exception:
pass
# Only overwrite error_text if one is passed; otherwise preserve what the stage already wrote
if error_text is not None:
await db.execute(
"""UPDATE burnin_stages
SET state=?, percent=?, finished_at=?, duration_seconds=?, error_text=?
WHERE burnin_job_id=? AND stage_name=?""",
(state, 100 if success else None, now, duration, error_text, job_id, stage_name),
)
else:
await db.execute(
"""UPDATE burnin_stages
SET state=?, percent=?, finished_at=?, duration_seconds=?
WHERE burnin_job_id=? AND stage_name=?""",
(state, 100 if success else None, now, duration, job_id, stage_name),
)
await db.commit()
async def _update_stage_percent(job_id: int, stage_name: str, pct: int) -> None:
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.execute(
"UPDATE burnin_stages SET percent=? WHERE burnin_job_id=? AND stage_name=?",
(pct, job_id, stage_name),
)
await db.commit()
async def _cancel_stage(job_id: int, stage_name: str) -> None:
now = _now()
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.execute(
"UPDATE burnin_stages SET state='cancelled', finished_at=? WHERE burnin_job_id=? AND stage_name=?",
(now, job_id, stage_name),
)
await db.commit()
async def _set_stage_error(job_id: int, stage_name: str, error_text: str) -> None:
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.execute(
"UPDATE burnin_stages SET error_text=? WHERE burnin_job_id=? AND stage_name=?",
(error_text, job_id, stage_name),
)
await db.commit()
async def _recalculate_progress(job_id: int, profile: str | None = None) -> None:
"""Recompute overall job % from actual stage rows. profile param is unused (kept for compat)."""
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
cur = await db.execute(
"SELECT stage_name, state, percent FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
(job_id,),
)
stages = await cur.fetchall()
if not stages:
return
total_weight = sum(_STAGE_BASE_WEIGHTS.get(s["stage_name"], 5) for s in stages)
if total_weight == 0:
return
completed = 0.0
current = None
for s in stages:
w = _STAGE_BASE_WEIGHTS.get(s["stage_name"], 5)
st = s["state"]
if st == "passed":
completed += w
elif st == "running":
completed += w * (s["percent"] or 0) / 100
current = s["stage_name"]
pct = int(completed / total_weight * 100)
await db.execute(
"UPDATE burnin_jobs SET percent=?, stage_name=? WHERE id=?",
(pct, current, job_id),
)
await db.commit()
# ---------------------------------------------------------------------------
# SSE push
# ---------------------------------------------------------------------------
def _push_update(alert: dict | None = None) -> None:
"""Notify SSE subscribers that data has changed, with optional browser notification payload."""
try:
from app import poller
poller._notify_subscribers(alert=alert)
except Exception:
pass
# ---------------------------------------------------------------------------
# Stuck-job detection (called by poller every ~5 cycles)
# ---------------------------------------------------------------------------
async def check_stuck_jobs() -> None:
"""Mark jobs that have been 'running' beyond stuck_job_hours as 'unknown'."""
threshold_seconds = settings.stuck_job_hours * 3600
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
cur = await db.execute("""
SELECT bj.id, bj.drive_id, d.devname, bj.started_at
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.state = 'running'
AND bj.started_at IS NOT NULL
AND (julianday('now') - julianday(bj.started_at)) * 86400 > ?
""", (threshold_seconds,))
stuck = await cur.fetchall()
if not stuck:
return
now = _now()
for row in stuck:
job_id, drive_id, devname, started_at = row[0], row[1], row[2], row[3]
log.critical(
"Stuck burn-in detected — marking unknown",
extra={"job_id": job_id, "devname": devname, "started_at": started_at},
)
await db.execute(
"UPDATE burnin_jobs SET state='unknown', finished_at=? WHERE id=?",
(now, job_id),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
("burnin_stuck", drive_id, job_id, "system",
f"Job stuck for >{settings.stuck_job_hours}h — automatically marked unknown"),
)
await db.commit()
_push_update()
log.warning("Marked %d stuck job(s) as unknown", len(stuck))