SSH (app/ssh_client.py — new):
- asyncssh-based client: start_smart_test, poll_smart_progress, abort_smart_test,
get_smart_attributes, run_badblocks with streaming progress callbacks
- SMART attribute table: monitors attrs 5/10/188/197/198/199 for warn/fail thresholds
- Falls back to REST API / mock simulation when ssh_host is not configured
Burn-in stages updated (burnin.py):
- _stage_smart_test: SSH path polls smartctl -a, stores raw output + parsed attributes
- _stage_surface_validate: SSH path streams badblocks, counts bad blocks vs configurable threshold
- _stage_final_check: SSH path checks smartctl attributes; DB fallback for mock mode
- New DB helpers: _append_stage_log, _update_stage_bad_blocks, _store_smart_attrs,
_store_smart_raw_output
Database (database.py):
- Migrations: burnin_stages.log_text, burnin_stages.bad_blocks,
drives.smart_attrs (JSON), smart_tests.raw_output
Settings (config.py + settings_store.py):
- ssh_host, ssh_port, ssh_user, ssh_password, ssh_key — all runtime-editable
- SSH section in Settings UI with Test SSH Connection button
Webhook (notifier.py):
- Added bad_blocks and timestamp fields to payload per SPEC
Drive reset (routes.py + drives_table.html):
- POST /api/v1/drives/{id}/reset — clears SMART state, smart_attrs; audit logged
- Reset button visible on drives with completed test state (no active burn-in)
Log drawer (app.js):
- Burn-In tab: shows raw stage log_text (SSH output) with bad block highlighting
- SMART tab: shows SMART attribute table with warn/fail colouring + raw smartctl output
Polish:
- Version badge (v1.0.0-6d) in header via Jinja2 global
- Parallel burn-in warning when max_parallel_burnins > 8 in Settings
- Stats page: avg duration by drive size + failure breakdown by stage
- settings.html: SSH section with key textarea, parallel warn div
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
955 lines
36 KiB
Python
955 lines
36 KiB
Python
"""
|
|
Burn-in orchestrator.
|
|
|
|
Manages a FIFO queue of burn-in jobs capped at MAX_PARALLEL_BURNINS concurrent
|
|
executions. Each job runs stages sequentially; a failed stage aborts the job.
|
|
|
|
State is persisted to SQLite throughout — DB is source of truth.
|
|
|
|
On startup:
|
|
- Any 'running' jobs from a previous run are marked 'unknown' (interrupted).
|
|
- Any 'queued' jobs are re-enqueued automatically.
|
|
|
|
Cancellation:
|
|
- cancel_job() sets DB state to 'cancelled'.
|
|
- Running stage coroutines check _is_cancelled() at POLL_INTERVAL boundaries
|
|
and abort within a few seconds of the cancel request.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
import aiosqlite
|
|
|
|
from app.config import settings
|
|
from app.truenas import TrueNASClient
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage definitions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
STAGE_ORDER: dict[str, list[str]] = {
|
|
# Legacy
|
|
"quick": ["precheck", "short_smart", "io_validate", "final_check"],
|
|
# Single-stage selectable profiles
|
|
"surface": ["precheck", "surface_validate", "final_check"],
|
|
"short": ["precheck", "short_smart", "final_check"],
|
|
"long": ["precheck", "long_smart", "final_check"],
|
|
# Two-stage combos
|
|
"surface_short": ["precheck", "surface_validate", "short_smart", "final_check"],
|
|
"surface_long": ["precheck", "surface_validate", "long_smart", "final_check"],
|
|
"short_long": ["precheck", "short_smart", "long_smart", "final_check"],
|
|
# All three
|
|
"full": ["precheck", "surface_validate", "short_smart", "long_smart", "final_check"],
|
|
}
|
|
|
|
# Per-stage base weights used to compute overall job % progress dynamically
|
|
_STAGE_BASE_WEIGHTS: dict[str, int] = {
|
|
"precheck": 5,
|
|
"surface_validate": 65,
|
|
"short_smart": 12,
|
|
"long_smart": 13,
|
|
"io_validate": 10,
|
|
"final_check": 5,
|
|
}
|
|
|
|
POLL_INTERVAL = 5.0 # seconds between progress checks during active stages
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level state (initialized in init())
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_semaphore: asyncio.Semaphore | None = None
|
|
_client: TrueNASClient | None = None
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _db():
|
|
"""Open a fresh WAL-mode connection. Caller must use 'async with'."""
|
|
return aiosqlite.connect(settings.db_path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Init + startup reconciliation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def init(client: TrueNASClient) -> None:
|
|
global _semaphore, _client
|
|
_semaphore = asyncio.Semaphore(settings.max_parallel_burnins)
|
|
_client = client
|
|
|
|
async with _db() as db:
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute("PRAGMA foreign_keys=ON")
|
|
|
|
# Mark interrupted running jobs as unknown
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET state='unknown', finished_at=? WHERE state='running'",
|
|
(_now(),),
|
|
)
|
|
|
|
# Re-enqueue previously queued jobs
|
|
cur = await db.execute(
|
|
"SELECT id FROM burnin_jobs WHERE state='queued' ORDER BY created_at"
|
|
)
|
|
queued = [r["id"] for r in await cur.fetchall()]
|
|
await db.commit()
|
|
|
|
for job_id in queued:
|
|
asyncio.create_task(_run_job(job_id))
|
|
|
|
log.info("Burn-in orchestrator ready (max_concurrent=%d)", settings.max_parallel_burnins)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def start_job(drive_id: int, profile: str, operator: str,
|
|
stage_order: list[str] | None = None) -> int:
|
|
"""Create and enqueue a burn-in job. Returns the new job ID.
|
|
|
|
If stage_order is provided (e.g. ["short_smart","long_smart","surface_validate"]),
|
|
the job runs those stages in that order (precheck and final_check are always prepended/appended).
|
|
Otherwise the preset STAGE_ORDER[profile] is used.
|
|
"""
|
|
now = _now()
|
|
|
|
# Build the actual stage list
|
|
if stage_order is not None:
|
|
stages = ["precheck"] + list(stage_order) + ["final_check"]
|
|
else:
|
|
stages = STAGE_ORDER[profile]
|
|
|
|
async with _db() as db:
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute("PRAGMA foreign_keys=ON")
|
|
|
|
# Reject duplicate active burn-in for same drive
|
|
cur = await db.execute(
|
|
"SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')",
|
|
(drive_id,),
|
|
)
|
|
if (await cur.fetchone())[0] > 0:
|
|
raise ValueError("Drive already has an active burn-in job")
|
|
|
|
# Create job
|
|
cur = await db.execute(
|
|
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
|
|
VALUES (?,?,?,?,?,?) RETURNING id""",
|
|
(drive_id, profile, "queued", 0, operator, now),
|
|
)
|
|
job_id = (await cur.fetchone())["id"]
|
|
|
|
# Create stage rows in the desired execution order
|
|
for stage_name in stages:
|
|
await db.execute(
|
|
"INSERT INTO burnin_stages (burnin_job_id, stage_name, state) VALUES (?,?,?)",
|
|
(job_id, stage_name, "pending"),
|
|
)
|
|
|
|
await db.execute(
|
|
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
|
|
VALUES (?,?,?,?,?)""",
|
|
("burnin_queued", drive_id, job_id, operator, f"Queued {profile} burn-in"),
|
|
)
|
|
await db.commit()
|
|
|
|
asyncio.create_task(_run_job(job_id))
|
|
log.info("Burn-in job %d queued (drive_id=%d profile=%s operator=%s)",
|
|
job_id, drive_id, profile, operator)
|
|
return job_id
|
|
|
|
|
|
async def cancel_job(job_id: int, operator: str) -> bool:
|
|
"""Cancel a queued or running job. Returns True if state was changed."""
|
|
async with _db() as db:
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
|
|
cur = await db.execute(
|
|
"SELECT state, drive_id FROM burnin_jobs WHERE id=?", (job_id,)
|
|
)
|
|
row = await cur.fetchone()
|
|
if not row or row["state"] not in ("queued", "running"):
|
|
return False
|
|
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET state='cancelled', finished_at=? WHERE id=?",
|
|
(_now(), job_id),
|
|
)
|
|
await db.execute(
|
|
"UPDATE burnin_stages SET state='cancelled' WHERE burnin_job_id=? AND state IN ('pending','running')",
|
|
(job_id,),
|
|
)
|
|
await db.execute(
|
|
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
|
|
VALUES (?,?,?,?,?)""",
|
|
("burnin_cancelled", row["drive_id"], job_id, operator, "Cancelled by operator"),
|
|
)
|
|
await db.commit()
|
|
|
|
log.info("Burn-in job %d cancelled by %s", job_id, operator)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Job runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _run_job(job_id: int) -> None:
|
|
"""Acquire semaphore slot, execute all stages, persist final state."""
|
|
assert _semaphore is not None, "burnin.init() not called"
|
|
|
|
async with _semaphore:
|
|
if await _is_cancelled(job_id):
|
|
return
|
|
|
|
# Transition queued → running
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
row = await (await db.execute(
|
|
"SELECT drive_id, profile FROM burnin_jobs WHERE id=?", (job_id,)
|
|
)).fetchone()
|
|
if not row:
|
|
return
|
|
drive_id, profile = row[0], row[1]
|
|
|
|
cur = await db.execute("SELECT devname, serial, model FROM drives WHERE id=?", (drive_id,))
|
|
devname_row = await cur.fetchone()
|
|
if not devname_row:
|
|
return
|
|
devname = devname_row[0]
|
|
drive_serial = devname_row[1]
|
|
drive_model = devname_row[2]
|
|
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET state='running', started_at=? WHERE id=?",
|
|
(_now(), job_id),
|
|
)
|
|
await db.execute(
|
|
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
|
|
VALUES (?,?,?,(SELECT operator FROM burnin_jobs WHERE id=?),?)""",
|
|
("burnin_started", drive_id, job_id, job_id, f"Started {profile} burn-in on {devname}"),
|
|
)
|
|
# Read stage order from DB (respects any custom order set at job creation)
|
|
stage_cur = await db.execute(
|
|
"SELECT stage_name FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
|
|
(job_id,),
|
|
)
|
|
job_stages = [r[0] for r in await stage_cur.fetchall()]
|
|
await db.commit()
|
|
|
|
_push_update()
|
|
log.info("Burn-in started", extra={"job_id": job_id, "devname": devname, "profile": profile})
|
|
|
|
success = False
|
|
error_text = None
|
|
try:
|
|
success = await _execute_stages(job_id, job_stages, devname, drive_id)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as exc:
|
|
error_text = str(exc)
|
|
log.exception("Burn-in raised exception", extra={"job_id": job_id, "devname": devname})
|
|
|
|
if await _is_cancelled(job_id):
|
|
return
|
|
|
|
final_state = "passed" if success else "failed"
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET state=?, percent=?, finished_at=?, error_text=? WHERE id=?",
|
|
(final_state, 100 if success else None, _now(), error_text, job_id),
|
|
)
|
|
await db.execute(
|
|
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
|
|
VALUES (?,?,?,(SELECT operator FROM burnin_jobs WHERE id=?),?)""",
|
|
(f"burnin_{final_state}", drive_id, job_id, job_id,
|
|
f"Burn-in {final_state} on {devname}"),
|
|
)
|
|
await db.commit()
|
|
|
|
# Build SSE alert for browser notifications
|
|
alert = {
|
|
"state": final_state,
|
|
"job_id": job_id,
|
|
"devname": devname,
|
|
"serial": drive_serial,
|
|
"model": drive_model,
|
|
"error_text": error_text,
|
|
}
|
|
_push_update(alert=alert)
|
|
log.info("Burn-in finished", extra={"job_id": job_id, "devname": devname, "state": final_state})
|
|
|
|
# Fire webhook + immediate email in background (non-blocking)
|
|
try:
|
|
from app import notifier
|
|
cur2 = None
|
|
async with _db() as db2:
|
|
db2.row_factory = aiosqlite.Row
|
|
cur2 = await db2.execute(
|
|
"SELECT profile, operator FROM burnin_jobs WHERE id=?", (job_id,)
|
|
)
|
|
job_row = await cur2.fetchone()
|
|
if job_row:
|
|
# Get bad_blocks count from surface_validate stage if present
|
|
bad_blocks = 0
|
|
async with _db() as db3:
|
|
cur3 = await db3.execute(
|
|
"SELECT bad_blocks FROM burnin_stages WHERE burnin_job_id=? AND stage_name='surface_validate'",
|
|
(job_id,)
|
|
)
|
|
bb_row = await cur3.fetchone()
|
|
if bb_row and bb_row[0]:
|
|
bad_blocks = bb_row[0]
|
|
asyncio.create_task(notifier.notify_job_complete(
|
|
job_id=job_id,
|
|
devname=devname,
|
|
serial=drive_serial,
|
|
model=drive_model,
|
|
state=final_state,
|
|
profile=job_row["profile"],
|
|
operator=job_row["operator"],
|
|
error_text=error_text,
|
|
bad_blocks=bad_blocks,
|
|
))
|
|
except Exception as exc:
|
|
log.error("Failed to schedule notifications: %s", exc)
|
|
|
|
|
|
async def _execute_stages(job_id: int, stages: list[str], devname: str, drive_id: int) -> bool:
|
|
for stage_name in stages:
|
|
if await _is_cancelled(job_id):
|
|
return False
|
|
|
|
await _start_stage(job_id, stage_name)
|
|
_push_update()
|
|
|
|
try:
|
|
ok = await _dispatch_stage(job_id, stage_name, devname, drive_id)
|
|
except Exception as exc:
|
|
log.error("Stage raised exception: %s", exc, extra={"job_id": job_id, "devname": devname, "stage": stage_name})
|
|
ok = False
|
|
await _finish_stage(job_id, stage_name, success=False, error_text=str(exc))
|
|
_push_update()
|
|
return False
|
|
|
|
if not ok and await _is_cancelled(job_id):
|
|
# Stage was aborted due to cancellation — mark it cancelled, not failed
|
|
await _cancel_stage(job_id, stage_name)
|
|
else:
|
|
await _finish_stage(job_id, stage_name, success=ok)
|
|
await _recalculate_progress(job_id)
|
|
_push_update()
|
|
|
|
if not ok:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def _dispatch_stage(job_id: int, stage_name: str, devname: str, drive_id: int) -> bool:
|
|
if stage_name == "precheck":
|
|
return await _stage_precheck(job_id, drive_id)
|
|
elif stage_name == "short_smart":
|
|
return await _stage_smart_test(job_id, devname, "SHORT", "short_smart", drive_id)
|
|
elif stage_name == "long_smart":
|
|
return await _stage_smart_test(job_id, devname, "LONG", "long_smart", drive_id)
|
|
elif stage_name == "surface_validate":
|
|
return await _stage_surface_validate(job_id, devname, drive_id)
|
|
elif stage_name == "io_validate":
|
|
return await _stage_timed_simulate(job_id, "io_validate", settings.io_validate_seconds)
|
|
elif stage_name == "final_check":
|
|
return await _stage_final_check(job_id, devname, drive_id)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Individual stage implementations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _stage_precheck(job_id: int, drive_id: int) -> bool:
|
|
"""Check SMART health and temperature before starting destructive work."""
|
|
async with _db() as db:
|
|
cur = await db.execute(
|
|
"SELECT smart_health, temperature_c FROM drives WHERE id=?", (drive_id,)
|
|
)
|
|
row = await cur.fetchone()
|
|
|
|
if not row:
|
|
return False
|
|
|
|
health, temp = row[0], row[1]
|
|
|
|
if health == "FAILED":
|
|
await _set_stage_error(job_id, "precheck", "Drive SMART health is FAILED — refusing to burn in")
|
|
return False
|
|
|
|
if temp and temp > settings.temp_crit_c:
|
|
await _set_stage_error(job_id, "precheck", f"Drive temperature {temp}°C exceeds {settings.temp_crit_c}°C limit")
|
|
return False
|
|
|
|
await asyncio.sleep(1) # Simulate brief check
|
|
return True
|
|
|
|
|
|
async def _stage_smart_test(job_id: int, devname: str, test_type: str, stage_name: str,
|
|
drive_id: int | None = None) -> bool:
|
|
"""Start a SMART test. Uses SSH if configured, TrueNAS REST API otherwise."""
|
|
from app import ssh_client
|
|
if ssh_client.is_configured():
|
|
return await _stage_smart_test_ssh(job_id, devname, test_type, stage_name, drive_id)
|
|
return await _stage_smart_test_api(job_id, devname, test_type, stage_name)
|
|
|
|
|
|
async def _stage_smart_test_api(job_id: int, devname: str, test_type: str, stage_name: str) -> bool:
|
|
"""TrueNAS REST API path for SMART test (mock / dev mode)."""
|
|
tn_job_id = await _client.start_smart_test([devname], test_type)
|
|
|
|
while True:
|
|
if await _is_cancelled(job_id):
|
|
try:
|
|
await _client.abort_job(tn_job_id)
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
jobs = await _client.get_smart_jobs()
|
|
job = next((j for j in jobs if j["id"] == tn_job_id), None)
|
|
|
|
if not job:
|
|
return False
|
|
|
|
state = job["state"]
|
|
pct = job["progress"]["percent"]
|
|
|
|
await _update_stage_percent(job_id, stage_name, pct)
|
|
await _recalculate_progress(job_id, None)
|
|
_push_update()
|
|
|
|
if state == "SUCCESS":
|
|
return True
|
|
elif state in ("FAILED", "ABORTED"):
|
|
await _set_stage_error(job_id, stage_name,
|
|
job.get("error") or f"SMART {test_type} test failed")
|
|
return False
|
|
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
|
|
async def _stage_smart_test_ssh(job_id: int, devname: str, test_type: str, stage_name: str,
|
|
drive_id: int | None) -> bool:
|
|
"""SSH path for SMART test — runs smartctl directly on TrueNAS."""
|
|
from app import ssh_client
|
|
|
|
# Start the test
|
|
try:
|
|
startup = await ssh_client.start_smart_test(devname, test_type)
|
|
await _append_stage_log(job_id, stage_name, startup + "\n")
|
|
except Exception as exc:
|
|
await _set_stage_error(job_id, stage_name, f"Failed to start SMART test via SSH: {exc}")
|
|
return False
|
|
|
|
# Brief pause to let the test register in smartctl output
|
|
await asyncio.sleep(3)
|
|
|
|
# Poll until complete
|
|
while True:
|
|
if await _is_cancelled(job_id):
|
|
try:
|
|
await ssh_client.abort_smart_test(devname)
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
try:
|
|
progress = await ssh_client.poll_smart_progress(devname)
|
|
except Exception as exc:
|
|
log.warning("SSH SMART poll failed: %s", exc, extra={"job_id": job_id})
|
|
await _append_stage_log(job_id, stage_name, f"[poll error] {exc}\n")
|
|
continue
|
|
|
|
await _append_stage_log(job_id, stage_name, progress["output"] + "\n---\n")
|
|
|
|
if progress["state"] == "running":
|
|
pct = max(0, 100 - progress["percent_remaining"])
|
|
await _update_stage_percent(job_id, stage_name, pct)
|
|
await _recalculate_progress(job_id)
|
|
_push_update()
|
|
|
|
elif progress["state"] == "passed":
|
|
await _update_stage_percent(job_id, stage_name, 100)
|
|
# Run attribute check
|
|
if drive_id is not None:
|
|
try:
|
|
attrs = await ssh_client.get_smart_attributes(devname)
|
|
await _store_smart_attrs(drive_id, attrs)
|
|
await _store_smart_raw_output(drive_id, test_type, attrs["raw_output"])
|
|
if attrs["failures"]:
|
|
error = "SMART attribute failures: " + "; ".join(attrs["failures"])
|
|
await _set_stage_error(job_id, stage_name, error)
|
|
return False
|
|
if attrs["warnings"]:
|
|
await _append_stage_log(
|
|
job_id, stage_name,
|
|
"[WARNING] " + "; ".join(attrs["warnings"]) + "\n"
|
|
)
|
|
except Exception as exc:
|
|
log.warning("Failed to retrieve SMART attributes: %s", exc)
|
|
await _recalculate_progress(job_id)
|
|
_push_update()
|
|
return True
|
|
|
|
elif progress["state"] == "failed":
|
|
await _set_stage_error(job_id, stage_name, f"SMART {test_type} test failed")
|
|
return False
|
|
# "unknown" → keep polling
|
|
|
|
|
|
async def _stage_surface_validate(job_id: int, devname: str, drive_id: int) -> bool:
|
|
"""
|
|
Surface validation stage.
|
|
SSH mode: runs badblocks -wsv -b 4096 -p 1 /dev/{devname}.
|
|
Mock mode: simulated timed progress (no real I/O).
|
|
"""
|
|
from app import ssh_client
|
|
if ssh_client.is_configured():
|
|
return await _stage_surface_validate_ssh(job_id, devname, drive_id)
|
|
return await _stage_timed_simulate(job_id, "surface_validate", settings.surface_validate_seconds)
|
|
|
|
|
|
async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int) -> bool:
|
|
"""Run badblocks over SSH, streaming output to stage log."""
|
|
from app import ssh_client
|
|
|
|
await _append_stage_log(
|
|
job_id, "surface_validate",
|
|
f"[START] badblocks -wsv -b 4096 -p 1 /dev/{devname}\n"
|
|
f"[NOTE] This is a DESTRUCTIVE write test. All data on /dev/{devname} will be overwritten.\n\n"
|
|
)
|
|
|
|
def _is_cancelled_sync() -> bool:
|
|
# Synchronous version — we check the DB state flag set by cancel_job()
|
|
import asyncio
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
return loop.run_until_complete(_is_cancelled(job_id))
|
|
except Exception:
|
|
return False
|
|
|
|
last_logged_pct = [-1]
|
|
|
|
def on_progress(pct: int, bad_blocks: int, line: str) -> None:
|
|
nonlocal last_logged_pct
|
|
# Write to log (fire-and-forget via asyncio.create_task from sync context)
|
|
# The log append is done in the async flush below
|
|
pass
|
|
|
|
accumulated_lines: list[str] = []
|
|
|
|
async def on_progress_async(pct: int, bad_blocks: int, line: str) -> None:
|
|
accumulated_lines.append(line)
|
|
# Flush to DB and update progress every ~25 lines to avoid excessive DB writes
|
|
if len(accumulated_lines) % 25 == 0:
|
|
await _append_stage_log(job_id, "surface_validate", "".join(accumulated_lines[-25:]))
|
|
await _update_stage_bad_blocks(job_id, "surface_validate", bad_blocks)
|
|
await _update_stage_percent(job_id, "surface_validate", pct)
|
|
await _recalculate_progress(job_id)
|
|
_push_update()
|
|
if await _is_cancelled(job_id):
|
|
raise asyncio.CancelledError
|
|
|
|
# Run badblocks — we adapt the callback pattern to async by collecting then flushing
|
|
result = {"bad_blocks": 0, "output": "", "aborted": False}
|
|
try:
|
|
# The actual streaming; we handle progress via the accumulated_lines pattern
|
|
bad_blocks_total = 0
|
|
output_lines: list[str] = []
|
|
|
|
async with await ssh_client._connect() as conn:
|
|
cmd = f"badblocks -wsv -b 4096 -p 1 /dev/{devname}"
|
|
async with conn.create_process(cmd) as proc:
|
|
import re as _re
|
|
|
|
async def _drain(stream, is_stderr: bool):
|
|
nonlocal bad_blocks_total
|
|
async for raw in stream:
|
|
line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace")
|
|
output_lines.append(line)
|
|
|
|
if is_stderr:
|
|
m = _re.search(r"([\d.]+)%\s+done", line)
|
|
if m:
|
|
pct = min(99, int(float(m.group(1))))
|
|
await _update_stage_percent(job_id, "surface_validate", pct)
|
|
await _update_stage_bad_blocks(job_id, "surface_validate", bad_blocks_total)
|
|
await _recalculate_progress(job_id)
|
|
_push_update()
|
|
else:
|
|
stripped = line.strip()
|
|
if stripped and stripped.isdigit():
|
|
bad_blocks_total += 1
|
|
|
|
# Append to DB log in chunks
|
|
if len(output_lines) % 20 == 0:
|
|
chunk = "".join(output_lines[-20:])
|
|
await _append_stage_log(job_id, "surface_validate", chunk)
|
|
|
|
# Abort on bad block threshold
|
|
if bad_blocks_total > settings.bad_block_threshold:
|
|
proc.kill()
|
|
output_lines.append(
|
|
f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded "
|
|
f"threshold ({settings.bad_block_threshold})\n"
|
|
)
|
|
return
|
|
|
|
if await _is_cancelled(job_id):
|
|
proc.kill()
|
|
return
|
|
|
|
await asyncio.gather(
|
|
_drain(proc.stdout, False),
|
|
_drain(proc.stderr, True),
|
|
return_exceptions=True,
|
|
)
|
|
await proc.wait()
|
|
|
|
# Flush remaining output
|
|
remainder = "".join(output_lines)
|
|
await _append_stage_log(job_id, "surface_validate", remainder)
|
|
result["bad_blocks"] = bad_blocks_total
|
|
result["output"] = remainder
|
|
result["aborted"] = bad_blocks_total > settings.bad_block_threshold
|
|
|
|
except asyncio.CancelledError:
|
|
return False
|
|
except Exception as exc:
|
|
await _append_stage_log(job_id, "surface_validate", f"\n[SSH error] {exc}\n")
|
|
await _set_stage_error(job_id, "surface_validate", f"SSH badblocks error: {exc}")
|
|
return False
|
|
|
|
await _update_stage_bad_blocks(job_id, "surface_validate", result["bad_blocks"])
|
|
|
|
if result["aborted"] or result["bad_blocks"] > settings.bad_block_threshold:
|
|
await _set_stage_error(
|
|
job_id, "surface_validate",
|
|
f"Surface validate FAILED: {result['bad_blocks']} bad block(s) found "
|
|
f"(threshold: {settings.bad_block_threshold})"
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def _stage_timed_simulate(job_id: int, stage_name: str, duration_seconds: int) -> bool:
|
|
"""Simulate a timed stage with progress updates (mock / dev mode)."""
|
|
start = time.monotonic()
|
|
|
|
while True:
|
|
if await _is_cancelled(job_id):
|
|
return False
|
|
|
|
elapsed = time.monotonic() - start
|
|
pct = min(100, int(elapsed / duration_seconds * 100))
|
|
|
|
await _update_stage_percent(job_id, stage_name, pct)
|
|
await _recalculate_progress(job_id, None)
|
|
_push_update()
|
|
|
|
if pct >= 100:
|
|
return True
|
|
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
|
|
async def _stage_final_check(job_id: int, devname: str, drive_id: int | None = None) -> bool:
|
|
"""
|
|
Verify drive passed all tests.
|
|
SSH mode: run smartctl -a and check critical attributes.
|
|
Mock mode: check SMART health field in DB.
|
|
"""
|
|
await asyncio.sleep(1)
|
|
from app import ssh_client
|
|
if ssh_client.is_configured() and drive_id is not None:
|
|
try:
|
|
attrs = await ssh_client.get_smart_attributes(devname)
|
|
await _store_smart_attrs(drive_id, attrs)
|
|
if attrs["health"] == "FAILED" or attrs["failures"]:
|
|
failures = attrs["failures"] or [f"SMART health: {attrs['health']}"]
|
|
await _set_stage_error(job_id, "final_check",
|
|
"Final check failed: " + "; ".join(failures))
|
|
return False
|
|
return True
|
|
except Exception as exc:
|
|
log.warning("SSH final_check failed, falling back to DB check: %s", exc)
|
|
|
|
# DB check (mock mode fallback)
|
|
async with _db() as db:
|
|
cur = await db.execute(
|
|
"SELECT smart_health FROM drives WHERE devname=?", (devname,)
|
|
)
|
|
row = await cur.fetchone()
|
|
|
|
if not row or row[0] == "FAILED":
|
|
await _set_stage_error(job_id, "final_check", "Drive SMART health is FAILED after burn-in")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _is_cancelled(job_id: int) -> bool:
|
|
async with _db() as db:
|
|
cur = await db.execute("SELECT state FROM burnin_jobs WHERE id=?", (job_id,))
|
|
row = await cur.fetchone()
|
|
return bool(row and row[0] == "cancelled")
|
|
|
|
|
|
async def _start_stage(job_id: int, stage_name: str) -> None:
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE burnin_stages SET state='running', started_at=? WHERE burnin_job_id=? AND stage_name=?",
|
|
(_now(), job_id, stage_name),
|
|
)
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET stage_name=? WHERE id=?",
|
|
(stage_name, job_id),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _finish_stage(job_id: int, stage_name: str, success: bool, error_text: str | None = None) -> None:
|
|
now = _now()
|
|
state = "passed" if success else "failed"
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
cur = await db.execute(
|
|
"SELECT started_at FROM burnin_stages WHERE burnin_job_id=? AND stage_name=?",
|
|
(job_id, stage_name),
|
|
)
|
|
row = await cur.fetchone()
|
|
duration = None
|
|
if row and row[0]:
|
|
try:
|
|
start = datetime.fromisoformat(row[0])
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=timezone.utc)
|
|
duration = (datetime.now(timezone.utc) - start).total_seconds()
|
|
except Exception:
|
|
pass
|
|
|
|
# Only overwrite error_text if one is passed; otherwise preserve what the stage already wrote
|
|
if error_text is not None:
|
|
await db.execute(
|
|
"""UPDATE burnin_stages
|
|
SET state=?, percent=?, finished_at=?, duration_seconds=?, error_text=?
|
|
WHERE burnin_job_id=? AND stage_name=?""",
|
|
(state, 100 if success else None, now, duration, error_text, job_id, stage_name),
|
|
)
|
|
else:
|
|
await db.execute(
|
|
"""UPDATE burnin_stages
|
|
SET state=?, percent=?, finished_at=?, duration_seconds=?
|
|
WHERE burnin_job_id=? AND stage_name=?""",
|
|
(state, 100 if success else None, now, duration, job_id, stage_name),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _update_stage_percent(job_id: int, stage_name: str, pct: int) -> None:
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE burnin_stages SET percent=? WHERE burnin_job_id=? AND stage_name=?",
|
|
(pct, job_id, stage_name),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _cancel_stage(job_id: int, stage_name: str) -> None:
|
|
now = _now()
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE burnin_stages SET state='cancelled', finished_at=? WHERE burnin_job_id=? AND stage_name=?",
|
|
(now, job_id, stage_name),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _append_stage_log(job_id: int, stage_name: str, text: str) -> None:
|
|
"""Append text to the log_text column of a burnin_stages row."""
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"""UPDATE burnin_stages
|
|
SET log_text = COALESCE(log_text, '') || ?
|
|
WHERE burnin_job_id=? AND stage_name=?""",
|
|
(text, job_id, stage_name),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _update_stage_bad_blocks(job_id: int, stage_name: str, count: int) -> None:
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE burnin_stages SET bad_blocks=? WHERE burnin_job_id=? AND stage_name=?",
|
|
(count, job_id, stage_name),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _store_smart_attrs(drive_id: int, attrs: dict) -> None:
|
|
"""Persist latest SMART attribute dict to drives.smart_attrs (JSON)."""
|
|
import json
|
|
# Convert int keys to str for JSON serialisation
|
|
serialisable = {str(k): v for k, v in attrs.get("attributes", {}).items()}
|
|
blob = json.dumps({
|
|
"health": attrs.get("health", "UNKNOWN"),
|
|
"attrs": serialisable,
|
|
"warnings": attrs.get("warnings", []),
|
|
"failures": attrs.get("failures", []),
|
|
})
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute("UPDATE drives SET smart_attrs=? WHERE id=?", (blob, drive_id))
|
|
await db.commit()
|
|
|
|
|
|
async def _store_smart_raw_output(drive_id: int, test_type: str, raw: str) -> None:
|
|
"""Store raw smartctl output in smart_tests.raw_output."""
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE smart_tests SET raw_output=? WHERE drive_id=? AND test_type=?",
|
|
(raw, drive_id, test_type.lower()),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _set_stage_error(job_id: int, stage_name: str, error_text: str) -> None:
|
|
async with _db() as db:
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute(
|
|
"UPDATE burnin_stages SET error_text=? WHERE burnin_job_id=? AND stage_name=?",
|
|
(error_text, job_id, stage_name),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _recalculate_progress(job_id: int, profile: str | None = None) -> None:
|
|
"""Recompute overall job % from actual stage rows. profile param is unused (kept for compat)."""
|
|
async with _db() as db:
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
|
|
cur = await db.execute(
|
|
"SELECT stage_name, state, percent FROM burnin_stages WHERE burnin_job_id=? ORDER BY id",
|
|
(job_id,),
|
|
)
|
|
stages = await cur.fetchall()
|
|
if not stages:
|
|
return
|
|
|
|
total_weight = sum(_STAGE_BASE_WEIGHTS.get(s["stage_name"], 5) for s in stages)
|
|
if total_weight == 0:
|
|
return
|
|
|
|
completed = 0.0
|
|
current = None
|
|
for s in stages:
|
|
w = _STAGE_BASE_WEIGHTS.get(s["stage_name"], 5)
|
|
st = s["state"]
|
|
if st == "passed":
|
|
completed += w
|
|
elif st == "running":
|
|
completed += w * (s["percent"] or 0) / 100
|
|
current = s["stage_name"]
|
|
|
|
pct = int(completed / total_weight * 100)
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET percent=?, stage_name=? WHERE id=?",
|
|
(pct, current, job_id),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSE push
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _push_update(alert: dict | None = None) -> None:
|
|
"""Notify SSE subscribers that data has changed, with optional browser notification payload."""
|
|
try:
|
|
from app import poller
|
|
poller._notify_subscribers(alert=alert)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stuck-job detection (called by poller every ~5 cycles)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def check_stuck_jobs() -> None:
|
|
"""Mark jobs that have been 'running' beyond stuck_job_hours as 'unknown'."""
|
|
threshold_seconds = settings.stuck_job_hours * 3600
|
|
|
|
async with _db() as db:
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
|
|
cur = await db.execute("""
|
|
SELECT bj.id, bj.drive_id, d.devname, bj.started_at
|
|
FROM burnin_jobs bj
|
|
JOIN drives d ON d.id = bj.drive_id
|
|
WHERE bj.state = 'running'
|
|
AND bj.started_at IS NOT NULL
|
|
AND (julianday('now') - julianday(bj.started_at)) * 86400 > ?
|
|
""", (threshold_seconds,))
|
|
stuck = await cur.fetchall()
|
|
|
|
if not stuck:
|
|
return
|
|
|
|
now = _now()
|
|
for row in stuck:
|
|
job_id, drive_id, devname, started_at = row[0], row[1], row[2], row[3]
|
|
log.critical(
|
|
"Stuck burn-in detected — marking unknown",
|
|
extra={"job_id": job_id, "devname": devname, "started_at": started_at},
|
|
)
|
|
await db.execute(
|
|
"UPDATE burnin_jobs SET state='unknown', finished_at=? WHERE id=?",
|
|
(now, job_id),
|
|
)
|
|
await db.execute(
|
|
"""INSERT INTO audit_events (event_type, drive_id, burnin_job_id, operator, message)
|
|
VALUES (?,?,?,?,?)""",
|
|
("burnin_stuck", drive_id, job_id, "system",
|
|
f"Job stuck for >{settings.stuck_job_hours}h — automatically marked unknown"),
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
_push_update()
|
|
log.warning("Marked %d stuck job(s) as unknown", len(stuck))
|