nas-burnin/app/burnin/__init__.py
Brandon Walter 1bc1b378ab
Some checks are pending
Security scan / pip-audit (push) Waiting to run
Security scan / bandit (push) Waiting to run
Security scan / gitleaks (push) Waiting to run
Security scan / mypy (push) Waiting to run
fix: cancel-mid-stage marks job 'unknown' not 'failed' (1.0.0-51)
Container restarts (uvicorn shutdown / 'docker compose up -d') were
silently classifying running burn-ins as 'failed' with empty
error_text. Two reasons converged:

1. _stage_surface_validate_ssh caught asyncio.CancelledError at the
   stage level and returned False, *swallowing* the cancel signal.
2. _run_job's outer CancelledError handler then never fired, so
   was_cancelled stayed False and the job got marked 'failed' (the
   "burn-in itself failed" classification) instead of 'unknown'
   (the honest "we don't know whether it would have passed").

Fix:
- Stage now does best-effort kill of remote badblocks (shielded so
  loop shutdown doesn't interrupt the kill), appends an [ABORTED]
  marker to the log, and re-raises CancelledError. _execute_stages
  doesn't catch it (CancelledError is BaseException, not Exception
  in 3.8+) so it propagates up to _run_job.
- _run_job's existing CancelledError handler now also reconciles
  any stage rows still recorded as 'running' by setting them to
  'unknown' with a clear error_text: "Task cancelled mid-run —
  likely container restart or shutdown". The job's error_text gets
  the same message so the drawer's Reason block has something
  specific to display, instead of falling back to the heuristic.

Future container restarts on running burn-ins will now show as
yellow "UNKNOWN" with the explicit cancel reason, matching the
existing behaviour of check_stuck_jobs() for stuck timeouts.
2026-05-09 12:32:46 -07:00

617 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Burn-in orchestrator.
Manages a FIFO queue of burn-in jobs capped at MAX_PARALLEL_BURNINS concurrent
executions. Each job runs stages sequentially; a failed stage aborts the job.
State is persisted to SQLite throughout — DB is source of truth.
On startup:
- Any 'running' jobs from a previous run are marked 'unknown' (interrupted).
- Any 'queued' jobs are re-enqueued automatically.
Cancellation:
- cancel_job() sets DB state to 'cancelled'.
- Running stage coroutines check _is_cancelled() at POLL_INTERVAL boundaries
and abort within a few seconds of the cancel request.
"""
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import aiosqlite
from app.config import settings
from app.truenas import TrueNASClient
log = logging.getLogger(__name__)
# Stage configuration + DB helpers extracted to _common.py in 1.0.0-31.
from ._common import ( # noqa: E402
STAGE_ORDER, _STAGE_BASE_WEIGHTS, POLL_INTERVAL,
_now, _db,
_is_cancelled,
_start_stage, _finish_stage, _cancel_stage, _set_stage_error,
_update_stage_percent, _update_stage_bad_blocks, _append_stage_log,
_store_smart_attrs, _store_smart_raw_output,
_recalculate_progress, _push_update,
)
# ---------------------------------------------------------------------------
# Module-level state (initialized in init())
# ---------------------------------------------------------------------------
_semaphore: asyncio.Semaphore | None = None
_client: TrueNASClient | None = None
# Live job tracking — keeps a strong reference to every _run_job task so it
# isn't garbage-collected (asyncio.create_task only keeps a weak ref) and so
# cancel_job / check_stuck_jobs can actually unwedge a stuck task.
_active_tasks: dict[int, "asyncio.Task"] = {}
# Remote-PID kill machinery + pool-drive unlock state both live in their
# own submodules. We re-export the names the rest of the app reaches for
# (and keep the _kill_remote_process / _is_unlocked aliases for callers
# that grew up before the split).
from . import kill as _kill # noqa: E402
from . import unlock as _unlock # noqa: E402
_remote_pids = _kill._remote_pids
_unlock_grants = _unlock._unlock_grants
PoolMemberError = _unlock.PoolMemberError
UNLOCK_TTL_SECONDS = _unlock.UNLOCK_TTL_SECONDS
BOOT_POOL_NAME = _unlock.BOOT_POOL_NAME
BOOT_POOL_CONFIRM_TOKEN = _unlock.BOOT_POOL_CONFIRM_TOKEN
EXPORTED_POOL_ROLE = _unlock.EXPORTED_POOL_ROLE
EXPORTED_CONFIRM_TOKEN = _unlock.EXPORTED_CONFIRM_TOKEN
MOUNTED_ROLE = _unlock.MOUNTED_ROLE
MOUNTED_CONFIRM_TOKEN = _unlock.MOUNTED_CONFIRM_TOKEN
unlock_expiry = _unlock.unlock_expiry
grant_pool_unlock = _unlock.grant_pool_unlock
_is_unlocked = _unlock.is_unlocked # legacy private name
_kill_remote_process = _kill.kill_remote_process
# _now() and _db() are re-exported from _common above.
# ---------------------------------------------------------------------------
# Init + startup reconciliation
# ---------------------------------------------------------------------------
async def init(client: TrueNASClient) -> None:
global _semaphore, _client
_semaphore = asyncio.Semaphore(settings.max_parallel_burnins)
_client = client
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
# Mark interrupted running jobs as unknown
await db.execute(
"UPDATE burnin_jobs SET state='unknown', finished_at=? WHERE state='running'",
(_now(),),
)
# Re-enqueue previously queued jobs
cur = await db.execute(
"SELECT id FROM burnin_jobs WHERE state='queued' ORDER BY created_at"
)
queued = [r["id"] for r in await cur.fetchall()]
await db.commit()
for job_id in queued:
_spawn_run_job(job_id)
log.info("Burn-in orchestrator ready (max_concurrent=%d)", settings.max_parallel_burnins)
def _spawn_run_job(job_id: int) -> "asyncio.Task":
"""Schedule a _run_job task and keep a strong reference to it.
Plain asyncio.create_task() only leaves a weak reference behind, so the
task can be GC'd before it ever runs. Storing it in _active_tasks also
lets cancel_job / check_stuck_jobs cancel it directly.
"""
task = asyncio.create_task(_run_job(job_id))
_active_tasks[job_id] = task
def _cleanup(t: "asyncio.Task") -> None:
# Remove only if it's still us — avoid clobbering a re-enqueued task.
if _active_tasks.get(job_id) is t:
_active_tasks.pop(job_id, None)
_kill.clear_remote_pid(job_id)
task.add_done_callback(_cleanup)
return task
# _kill_remote_process is re-exported above from .kill — the original
# definition was extracted to app/burnin/kill.py in 1.0.0-30.
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def start_job(drive_id: int, profile: str, operator: str,
stage_order: list[str] | None = None) -> int:
"""Create and enqueue a burn-in job. Returns the new job ID.
If stage_order is provided (e.g. ["short_smart","long_smart","surface_validate"]),
the job runs those stages in that order (precheck and final_check are always prepended/appended).
Otherwise the preset STAGE_ORDER[profile] is used.
"""
now = _now()
# Build the actual stage list
if stage_order is not None:
stages = ["precheck"] + list(stage_order) + ["final_check"]
else:
stages = STAGE_ORDER[profile]
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
# Reject duplicate active burn-in for same drive
cur = await db.execute(
"SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')",
(drive_id,),
)
if (await cur.fetchone())[0] > 0:
raise ValueError("Drive already has an active burn-in job")
# Pool-membership gate: locked unless the operator explicitly
# unlocked this drive via /api/v1/drives/{id}/unlock recently.
# _is_unlocked also checks that the grant's stored (pool_name,
# pool_role) still matches the live row — a grant issued for an
# exported drive doesn't carry over if the drive turns out to be
# in an active pool on the next poll.
cur = await db.execute(
"SELECT pool_name, pool_role, devname FROM drives WHERE id=?", (drive_id,)
)
drow = await cur.fetchone()
if drow and drow["pool_name"] and not _is_unlocked(
drive_id, drow["pool_name"], drow["pool_role"]
):
raise PoolMemberError(drive_id, drow["pool_name"], drow["pool_role"])
# Closes Codex finding #5: re-check pool state OVER SSH right now,
# not against cached row. Defends against the 12s poll window
# where a drive could have been imported into a pool, mounted, or
# had ZFS labels written between when the operator unlocked it
# and when they clicked Start. Adds ~200ms per start; cheap
# against the cost of destroying a freshly-imported pool.
if drow:
from app import ssh_client as _ssh
if _ssh.is_configured():
fresh = await _ssh.fresh_pool_check_for_drive(drow["devname"])
cached = (
{"pool": drow["pool_name"], "role": drow["pool_role"]}
if drow["pool_name"] else None
)
if fresh != cached:
# State changed since the last poll. Invalidate any
# unlock grant (it was bound to stale identity) and
# refuse with a descriptive error so the operator
# knows to wait for the next poll cycle.
_unlock.invalidate_grant(drive_id)
fresh_pool = fresh["pool"] if fresh else None
fresh_role = fresh["role"] if fresh else None
if fresh_pool:
raise PoolMemberError(drive_id, fresh_pool, fresh_role)
# If the FRESH check shows free but cached said
# locked, the drive was just removed from a pool —
# safe to start, but invalidate any stale grant so
# the operator doesn't reuse old confirmations.
log.warning(
"Live pool check for drive_id=%d (%s): cached=%s "
"fresh=None — drive came free since last poll, "
"allowing burn-in",
drive_id, drow["devname"], cached,
)
# Create job. The partial unique index uniq_active_burnin_per_drive
# (database.py) is the actual race-stopper here: if two concurrent
# /api/v1/burnin/start calls both pass the SELECT-COUNT check above,
# only one INSERT can win; the loser raises IntegrityError, which
# we surface with the same ValueError as the inline duplicate check.
try:
cur = await db.execute(
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
VALUES (?,?,?,?,?,?) RETURNING id""",
(drive_id, profile, "queued", 0, operator, now),
)
job_id = (await cur.fetchone())["id"]
except aiosqlite.IntegrityError:
raise ValueError("Drive already has an active burn-in job")
# Create stage rows in the desired execution order
for stage_name in stages:
await db.execute(
"INSERT INTO burnin_stages (burnin_job_id, stage_name, state) VALUES (?,?,?)",
(job_id, stage_name, "pending"),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
("burnin_queued", drive_id, job_id, operator, f"Queued {profile} burn-in"),
)
await db.commit()
_spawn_run_job(job_id)
log.info("Burn-in job %d queued (drive_id=%d profile=%s operator=%s)",
job_id, drive_id, profile, operator)
return job_id
async def cancel_job(job_id: int, operator: str) -> bool:
"""Cancel a queued or running job. Returns True if state was changed."""
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
cur = await db.execute(
"SELECT state, drive_id FROM burnin_jobs WHERE id=?", (job_id,)
)
row = await cur.fetchone()
if not row or row["state"] not in ("queued", "running"):
return False
await db.execute(
"UPDATE burnin_jobs SET state='cancelled', finished_at=? WHERE id=?",
(_now(), job_id),
)
await db.execute(
"UPDATE burnin_stages SET state='cancelled' WHERE burnin_job_id=? AND state IN ('pending','running')",
(job_id,),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
("burnin_cancelled", row["drive_id"], job_id, operator, "Cancelled by operator"),
)
await db.commit()
# Kill the remote child process FIRST (so proc.wait() in the running task
# can return), then cancel the task so any other awaits unblock.
await _kill_remote_process(job_id)
task = _active_tasks.get(job_id)
if task and not task.done():
task.cancel()
log.info("Burn-in job %d cancelled by %s", job_id, operator)
return True
# ---------------------------------------------------------------------------
# Job runner
# ---------------------------------------------------------------------------
async def _thermal_gate_ok() -> bool:
"""True if it's thermally safe to start a new burn-in.
Checks the peak temperature of drives currently under active burn-in.
"""
try:
async with _db() as db:
cur = await db.execute("""
SELECT MAX(d.temperature_c)
FROM drives d
JOIN burnin_jobs bj ON bj.drive_id = d.id
WHERE bj.state = 'running' AND d.temperature_c IS NOT NULL
""")
row = await cur.fetchone()
max_temp = row[0] if row and row[0] is not None else None
return max_temp is None or max_temp < settings.temp_warn_c
except Exception:
return True # Never block on error
async def _run_job(job_id: int) -> None:
"""Acquire semaphore slot, execute all stages, persist final state."""
assert _semaphore is not None, "burnin.init() not called"
# Adaptive thermal gate: wait before competing for a slot if running drives
# are already at or above the warning threshold. This prevents layering a
# new burn-in on top of a thermally-stressed system. Gives up after 3 min
# and proceeds anyway so jobs don't queue indefinitely.
for _attempt in range(18): # 18 × 10 s = 3 min max
if await _thermal_gate_ok():
break
if _attempt == 0:
log.info(
"Thermal gate: job %d waiting — running drive temps at or above %d°C",
job_id, settings.temp_warn_c,
)
await asyncio.sleep(10)
else:
log.warning("Thermal gate timed out for job %d — proceeding anyway", job_id)
async with _semaphore:
if await _is_cancelled(job_id):
return
# Transition queued → running
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
row = await (await db.execute(
"SELECT drive_id, profile FROM burnin_jobs WHERE id=?", (job_id,)
)).fetchone()
if not row:
return
drive_id, profile = row[0], row[1]
cur = await db.execute("SELECT devname, serial, model FROM drives WHERE id=?", (drive_id,))
devname_row = await cur.fetchone()
if not devname_row:
return
devname = devname_row[0]
drive_serial = devname_row[1]
drive_model = devname_row[2]
await db.execute(
"UPDATE burnin_jobs SET state='running', started_at=? WHERE id=?",
(_now(), job_id),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,(SELECT operator FROM burnin_jobs WHERE id=?),?)""",
("burnin_started", drive_id, job_id, job_id, f"Started {profile} burn-in on {devname}"),
)
# Read stage order from DB (respects any custom order set at job creation)
stage_cur = await db.execute(
"SELECT stage_name FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
(job_id,),
)
job_stages = [r[0] for r in await stage_cur.fetchall()]
await db.commit()
_push_update()
log.info("Burn-in started", extra={"job_id": job_id, "devname": devname, "profile": profile})
success = False
error_text = None
was_cancelled = False
try:
success = await _execute_stages(job_id, job_stages, devname, drive_id)
except asyncio.CancelledError:
was_cancelled = True
except Exception as exc:
error_text = str(exc)
log.exception("Burn-in raised exception", extra={"job_id": job_id, "devname": devname})
# If the job has already moved to a terminal state — by cancel_job
# ('cancelled') or check_stuck_jobs ('unknown') — leave it alone. The
# task may have been cancelled mid-stage; finalizing as 'failed' would
# clobber that audit-meaningful terminal state.
async with _db() as db:
cur = await db.execute("SELECT state FROM burnin_jobs WHERE id=?", (job_id,))
cur_row = await cur.fetchone()
if cur_row and cur_row[0] != "running":
return
# Cancellation arriving here means the asyncio task was cancelled
# by something other than cancel_job/check_stuck_jobs (shutdown,
# uvicorn reload, future code paths). The DB still says 'running',
# so we have to write *some* terminal state, but classifying the
# interrupted job as 'failed' would lie — we don't actually know
# whether the underlying SMART/badblocks work passed or not.
if was_cancelled:
final_state = "unknown"
else:
final_state = "passed" if success else "failed"
# If the asyncio task was cancelled mid-stage (container shutdown,
# uvicorn reload, etc.), CancelledError propagates past
# _execute_stages, so any running stage row is still marked
# 'running' in the DB. Reconcile here: mark every still-running
# stage on this job as 'unknown' with the parent's finished_at,
# and stamp a default error_text so the drawer's Reason block has
# something concrete to show. Use a write that's idempotent under
# repeat (only touches rows still 'running').
cancel_err = (
"Task cancelled mid-run — likely container restart or shutdown"
if was_cancelled else None
)
async with _db() as db:
await db.execute("PRAGMA journal_mode=WAL")
await db.execute(
"UPDATE burnin_jobs SET state=?, percent=?, finished_at=?, error_text=? WHERE id=?",
(final_state, 100 if success else None, _now(),
error_text or cancel_err, job_id),
)
if was_cancelled:
await db.execute(
"""UPDATE burnin_stages
SET state='unknown', finished_at=?,
error_text=COALESCE(error_text, ?)
WHERE burnin_job_id=? AND state='running'""",
(_now(), cancel_err, job_id),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,(SELECT operator FROM burnin_jobs WHERE id=?),?)""",
(f"burnin_{final_state}", drive_id, job_id, job_id,
f"Burn-in {final_state} on {devname}"),
)
await db.commit()
# Build SSE alert for browser notifications
alert = {
"state": final_state,
"job_id": job_id,
"devname": devname,
"serial": drive_serial,
"model": drive_model,
"error_text": error_text,
}
_push_update(alert=alert)
log.info("Burn-in finished", extra={"job_id": job_id, "devname": devname, "state": final_state})
# Fire webhook + immediate email in background (non-blocking)
try:
from app import notifier
cur2 = None
async with _db() as db2:
db2.row_factory = aiosqlite.Row
cur2 = await db2.execute(
"SELECT profile, operator FROM burnin_jobs WHERE id=?", (job_id,)
)
job_row = await cur2.fetchone()
if job_row:
# Get bad_blocks count from surface_validate stage if present
bad_blocks = 0
async with _db() as db3:
cur3 = await db3.execute(
"SELECT bad_blocks FROM burnin_stages WHERE burnin_job_id=? AND stage_name='surface_validate'",
(job_id,)
)
bb_row = await cur3.fetchone()
if bb_row and bb_row[0]:
bad_blocks = bb_row[0]
asyncio.create_task(notifier.notify_job_complete(
job_id=job_id,
devname=devname,
serial=drive_serial,
model=drive_model,
state=final_state,
profile=job_row["profile"],
operator=job_row["operator"],
error_text=error_text,
bad_blocks=bad_blocks,
))
except Exception as exc:
log.error("Failed to schedule notifications: %s", exc)
async def _execute_stages(job_id: int, stages: list[str], devname: str, drive_id: int) -> bool:
for stage_name in stages:
if await _is_cancelled(job_id):
return False
await _start_stage(job_id, stage_name)
_push_update()
try:
ok = await _dispatch_stage(job_id, stage_name, devname, drive_id)
except Exception as exc:
log.error("Stage raised exception: %s", exc, extra={"job_id": job_id, "devname": devname, "stage": stage_name})
ok = False
await _finish_stage(job_id, stage_name, success=False, error_text=str(exc))
_push_update()
return False
if not ok and await _is_cancelled(job_id):
# Stage was aborted due to cancellation — mark it cancelled, not failed
await _cancel_stage(job_id, stage_name)
else:
await _finish_stage(job_id, stage_name, success=ok)
await _recalculate_progress(job_id)
_push_update()
if not ok:
return False
return True
# Per-stage implementations and the dispatch router live in stages.py.
from .stages import ( # noqa: E402
_dispatch_stage,
_badblocks_available,
_nvme_cli_available,
_stage_precheck,
_stage_smart_test,
_stage_smart_test_api,
_stage_smart_test_ssh,
_stage_surface_validate,
_stage_surface_validate_nvme,
_stage_surface_validate_ssh,
_stage_surface_validate_truenas,
_stage_timed_simulate,
_stage_final_check,
)
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
# DB helpers / progress / SSE re-exported from _common above.
# ---------------------------------------------------------------------------
# Stuck-job detection (called by poller every ~5 cycles)
# ---------------------------------------------------------------------------
async def check_stuck_jobs() -> None:
"""Mark jobs that have been 'running' beyond stuck_job_hours as 'unknown'."""
threshold_seconds = settings.stuck_job_hours * 3600
async with _db() as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL")
cur = await db.execute("""
SELECT bj.id, bj.drive_id, d.devname, bj.started_at
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.state = 'running'
AND bj.started_at IS NOT NULL
AND (julianday('now') - julianday(bj.started_at)) * 86400 > ?
""", (threshold_seconds,))
stuck = await cur.fetchall()
if not stuck:
return
now = _now()
for row in stuck:
job_id, drive_id, devname, started_at = row[0], row[1], row[2], row[3]
log.critical(
"Stuck burn-in detected — marking unknown",
extra={"job_id": job_id, "devname": devname, "started_at": started_at},
)
await db.execute(
"UPDATE burnin_jobs SET state='unknown', finished_at=? WHERE id=?",
(now, job_id),
)
await db.execute(
"""UPDATE burnin_stages SET state='unknown', finished_at=?
WHERE burnin_job_id=? AND state='running'""",
(now, job_id),
)
await db.execute(
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
("burnin_stuck", drive_id, job_id, "system",
f"Job stuck for >{settings.stuck_job_hours}h — automatically marked unknown"),
)
await db.commit()
# Actually unstick the running tasks so they release their semaphore slot.
# Without this the DB state becomes 'unknown' but the asyncio task keeps
# holding the slot forever — which is the bug that left subsequent jobs
# permanently 'queued' until container restart.
for row in stuck:
job_id = row[0]
await _kill_remote_process(job_id)
task = _active_tasks.get(job_id)
if task and not task.done():
task.cancel()
_push_update()
log.warning("Marked %d stuck job(s) as unknown", len(stuck))