import asyncio import csv import io import json from datetime import datetime, timezone import aiosqlite from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi.responses import HTMLResponse, StreamingResponse from sse_starlette.sse import EventSourceResponse from app import burnin, mailer, poller, settings_store from app.config import settings from app.database import get_db from app.models import ( BurninJobResponse, BurninStageResponse, CancelBurninRequest, DriveResponse, SmartTestState, StartBurninRequest, UpdateDriveRequest, ) from app.renderer import templates router = APIRouter() # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _eta_seconds(eta_at: str | None) -> int | None: if not eta_at: return None try: eta_ts = datetime.fromisoformat(eta_at) if eta_ts.tzinfo is None: eta_ts = eta_ts.replace(tzinfo=timezone.utc) remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds() return max(0, int(remaining)) except Exception: return None def _is_stale(last_polled_at: str) -> bool: try: last = datetime.fromisoformat(last_polled_at) if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - last).total_seconds() > settings.stale_threshold_seconds except Exception: return True def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState: eta_at = row[f"{prefix}_eta_at"] return SmartTestState( state=row[f"{prefix}_state"] or "idle", percent=row[f"{prefix}_percent"], eta_seconds=_eta_seconds(eta_at), eta_timestamp=eta_at, started_at=row[f"{prefix}_started_at"], finished_at=row[f"{prefix}_finished_at"], error_text=row[f"{prefix}_error"], ) def _row_to_drive(row: aiosqlite.Row) -> DriveResponse: return DriveResponse( id=row["id"], devname=row["devname"], serial=row["serial"], model=row["model"], size_bytes=row["size_bytes"], temperature_c=row["temperature_c"], smart_health=row["smart_health"] or "UNKNOWN", last_polled_at=row["last_polled_at"], is_stale=_is_stale(row["last_polled_at"]), smart_short=_build_smart(row, "short"), smart_long=_build_smart(row, "long"), notes=row["notes"], location=row["location"], ) def _compute_status(drive: dict) -> str: short = (drive.get("smart_short") or {}).get("state", "idle") long_ = (drive.get("smart_long") or {}).get("state", "idle") health = drive.get("smart_health", "UNKNOWN") if "running" in (short, long_): return "running" if short == "failed" or long_ == "failed" or health == "FAILED": return "failed" if "passed" in (short, long_): return "passed" return "idle" _DRIVES_QUERY = """ SELECT d.id, d.devname, d.serial, d.model, d.size_bytes, d.temperature_c, d.smart_health, d.last_polled_at, d.notes, d.location, s.state AS short_state, s.percent AS short_percent, s.started_at AS short_started_at, s.eta_at AS short_eta_at, s.finished_at AS short_finished_at, s.error_text AS short_error, l.state AS long_state, l.percent AS long_percent, l.started_at AS long_started_at, l.eta_at AS long_eta_at, l.finished_at AS long_finished_at, l.error_text AS long_error FROM drives d LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short' LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long' {where} ORDER BY d.devname """ async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]: """Return latest burn-in job (any state) keyed by drive_id.""" cur = await db.execute(""" SELECT bj.* FROM burnin_jobs bj WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id) """) rows = await cur.fetchall() return {r["drive_id"]: dict(r) for r in rows} async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]: cur = await db.execute(_DRIVES_QUERY.format(where="")) rows = await cur.fetchall() burnin_by_drive = await _fetch_burnin_by_drive(db) drives = [] for row in rows: d = _row_to_drive(row).model_dump() d["status"] = _compute_status(d) d["burnin"] = burnin_by_drive.get(d["id"]) drives.append(d) return drives def _stale_context(poller_state: dict) -> dict: last = poller_state.get("last_poll_at") if not last: return {"stale": False, "stale_seconds": 0} try: dt = datetime.fromisoformat(last) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) elapsed = int((datetime.now(timezone.utc) - dt).total_seconds()) stale = elapsed > settings.stale_threshold_seconds return {"stale": stale, "stale_seconds": elapsed} except Exception: return {"stale": False, "stale_seconds": 0} # --------------------------------------------------------------------------- # Dashboard # --------------------------------------------------------------------------- @router.get("/", response_class=HTMLResponse) async def dashboard(request: Request, db: aiosqlite.Connection = Depends(get_db)): drives = await _fetch_drives_for_template(db) ps = poller.get_state() return templates.TemplateResponse("dashboard.html", { "request": request, "drives": drives, "poller": ps, **_stale_context(ps), }) # --------------------------------------------------------------------------- # SSE — live drive table updates # --------------------------------------------------------------------------- @router.get("/sse/drives") async def sse_drives(request: Request): q = poller.subscribe() async def generate(): try: while True: # Wait for next poll notification or keepalive timeout try: payload = await asyncio.wait_for(q.get(), timeout=25.0) except asyncio.TimeoutError: if await request.is_disconnected(): break yield {"event": "keepalive", "data": ""} continue if await request.is_disconnected(): break # Extract alert from payload (may be None for regular polls) alert = None if isinstance(payload, dict): alert = payload.get("alert") # Render fresh table HTML async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row await db.execute("PRAGMA journal_mode=WAL") drives = await _fetch_drives_for_template(db) html = templates.env.get_template( "components/drives_table.html" ).render(drives=drives) yield {"event": "drives-update", "data": html} # Push browser notification event if this was a job completion if alert: yield {"event": "job-alert", "data": json.dumps(alert)} finally: poller.unsubscribe(q) return EventSourceResponse(generate()) # --------------------------------------------------------------------------- # JSON API # --------------------------------------------------------------------------- @router.get("/health") async def health(db: aiosqlite.Connection = Depends(get_db)): ps = poller.get_state() cur = await db.execute("SELECT COUNT(*) FROM drives") row = await cur.fetchone() drives_tracked = row[0] if row else 0 return { "status": "ok" if ps["healthy"] else "degraded", "last_poll_at": ps["last_poll_at"], "last_error": ps["last_error"], "consecutive_failures": ps.get("consecutive_failures", 0), "poll_interval_seconds": settings.poll_interval_seconds, "drives_tracked": drives_tracked, } @router.get("/api/v1/drives", response_model=list[DriveResponse]) async def list_drives(db: aiosqlite.Connection = Depends(get_db)): cur = await db.execute(_DRIVES_QUERY.format(where="")) rows = await cur.fetchall() return [_row_to_drive(r) for r in rows] @router.get("/api/v1/drives/{drive_id}", response_model=DriveResponse) async def get_drive(drive_id: int, db: aiosqlite.Connection = Depends(get_db)): cur = await db.execute( _DRIVES_QUERY.format(where="WHERE d.id = ?"), (drive_id,) ) row = await cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Drive not found") return _row_to_drive(row) @router.post("/api/v1/drives/{drive_id}/smart/start") async def smart_start( drive_id: int, body: dict, db: aiosqlite.Connection = Depends(get_db), ): """Start a standalone SHORT or LONG SMART test on a single drive.""" from app.truenas import TrueNASClient from app import burnin as _burnin test_type = (body.get("type") or "").upper() if test_type not in ("SHORT", "LONG"): raise HTTPException(status_code=422, detail="type must be SHORT or LONG") cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,)) row = await cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Drive not found") devname = row[0] # Use the shared TrueNAS client held by the burnin module client = _burnin._client if client is None: raise HTTPException(status_code=503, detail="TrueNAS client not ready") try: tn_job_id = await client.start_smart_test([devname], test_type) except Exception as exc: raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}") return {"job_id": tn_job_id, "devname": devname, "type": test_type} @router.post("/api/v1/drives/{drive_id}/smart/cancel") async def smart_cancel( drive_id: int, body: dict, db: aiosqlite.Connection = Depends(get_db), ): """Cancel a running standalone SMART test on a drive.""" from app import burnin as _burnin test_type = (body.get("type") or "").lower() if test_type not in ("short", "long"): raise HTTPException(status_code=422, detail="type must be 'short' or 'long'") cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,)) row = await cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Drive not found") devname = row[0] client = _burnin._client if client is None: raise HTTPException(status_code=503, detail="TrueNAS client not ready") # Find the running TrueNAS job for this drive/test-type try: jobs = await client.get_smart_jobs() tn_job_id = None for j in jobs: if j.get("state") != "RUNNING": continue args = j.get("arguments", []) if not args or not isinstance(args[0], dict): continue if devname in args[0].get("disks", []): tn_job_id = j["id"] break if tn_job_id is None: raise HTTPException(status_code=404, detail="No running SMART test found for this drive") await client.abort_job(tn_job_id) except HTTPException: raise except Exception as exc: raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}") # Update local DB state now = datetime.now(timezone.utc).isoformat() await db.execute( "UPDATE smart_tests SET state='aborted', finished_at=? WHERE drive_id=? AND test_type=? AND state='running'", (now, drive_id, test_type), ) await db.commit() return {"cancelled": True, "devname": devname, "type": test_type} # --------------------------------------------------------------------------- # Burn-in API # --------------------------------------------------------------------------- def _row_to_burnin(row: aiosqlite.Row, stages: list[aiosqlite.Row]) -> BurninJobResponse: return BurninJobResponse( id=row["id"], drive_id=row["drive_id"], profile=row["profile"], state=row["state"], percent=row["percent"] or 0, stage_name=row["stage_name"], operator=row["operator"], created_at=row["created_at"], started_at=row["started_at"], finished_at=row["finished_at"], error_text=row["error_text"], stages=[ BurninStageResponse( id=s["id"], stage_name=s["stage_name"], state=s["state"], percent=s["percent"] or 0, started_at=s["started_at"], finished_at=s["finished_at"], error_text=s["error_text"], ) for s in stages ], ) @router.post("/api/v1/burnin/start") async def burnin_start(req: StartBurninRequest): results = [] errors = [] for drive_id in req.drive_ids: try: job_id = await burnin.start_job( drive_id, req.profile, req.operator, stage_order=req.stage_order ) results.append({"drive_id": drive_id, "job_id": job_id}) except ValueError as exc: errors.append({"drive_id": drive_id, "error": str(exc)}) if errors and not results: raise HTTPException(status_code=409, detail=errors[0]["error"]) return {"queued": results, "errors": errors} @router.post("/api/v1/burnin/{job_id}/cancel") async def burnin_cancel(job_id: int, req: CancelBurninRequest): ok = await burnin.cancel_job(job_id, req.operator) if not ok: raise HTTPException(status_code=409, detail="Job not found or not cancellable") return {"cancelled": True} # --------------------------------------------------------------------------- # History pages # --------------------------------------------------------------------------- _PAGE_SIZE = 50 _ALL_STATES = ("queued", "running", "passed", "failed", "cancelled", "unknown") _HISTORY_QUERY = """ SELECT bj.id, bj.drive_id, bj.profile, bj.state, bj.operator, bj.created_at, bj.started_at, bj.finished_at, bj.error_text, d.devname, d.serial, d.model, d.size_bytes, CAST( (julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 AS INTEGER ) AS duration_seconds FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id {where} ORDER BY bj.id DESC """ def _state_where(state: str) -> tuple[str, list]: if state == "all": return "", [] return "WHERE bj.state = ?", [state] @router.get("/history", response_class=HTMLResponse) async def history_list( request: Request, state: str = Query(default="all"), page: int = Query(default=1, ge=1), db: aiosqlite.Connection = Depends(get_db), ): if state not in ("all",) + _ALL_STATES: state = "all" where_clause, params = _state_where(state) # Total count count_sql = f"SELECT COUNT(*) FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id {where_clause}" cur = await db.execute(count_sql, params) total_count = (await cur.fetchone())[0] total_pages = max(1, (total_count + _PAGE_SIZE - 1) // _PAGE_SIZE) page = min(page, total_pages) offset = (page - 1) * _PAGE_SIZE # Per-state counts for badges cur = await db.execute( "SELECT state, COUNT(*) FROM burnin_jobs GROUP BY state" ) counts = {"all": total_count if state == "all" else 0} for r in await cur.fetchall(): counts[r[0]] = r[1] if state != "all": cur2 = await db.execute("SELECT COUNT(*) FROM burnin_jobs") counts["all"] = (await cur2.fetchone())[0] # Job rows sql = _HISTORY_QUERY.format(where=where_clause) + " LIMIT ? OFFSET ?" cur = await db.execute(sql, params + [_PAGE_SIZE, offset]) rows = await cur.fetchall() jobs = [dict(r) for r in rows] ps = poller.get_state() return templates.TemplateResponse("history.html", { "request": request, "jobs": jobs, "active_state": state, "counts": counts, "page": page, "total_pages": total_pages, "total_count": total_count, "poller": ps, **_stale_context(ps), }) @router.get("/history/{job_id}", response_class=HTMLResponse) async def history_detail( request: Request, job_id: int, db: aiosqlite.Connection = Depends(get_db), ): # Job + drive info cur = await db.execute(""" SELECT bj.*, d.devname, d.serial, d.model, d.size_bytes, CAST( (julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 AS INTEGER ) AS duration_seconds FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id WHERE bj.id = ? """, (job_id,)) row = await cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Burn-in job not found") job = dict(row) # Stages (with duration) cur = await db.execute(""" SELECT *, CAST( (julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER ) AS duration_seconds FROM burnin_stages WHERE burnin_job_id = ? ORDER BY id """, (job_id,)) job["stages"] = [dict(r) for r in await cur.fetchall()] ps = poller.get_state() return templates.TemplateResponse("job_detail.html", { "request": request, "job": job, "poller": ps, **_stale_context(ps), }) # --------------------------------------------------------------------------- # CSV export # --------------------------------------------------------------------------- @router.get("/api/v1/burnin/export.csv") async def burnin_export_csv(db: aiosqlite.Connection = Depends(get_db)): cur = await db.execute(""" SELECT bj.id AS job_id, bj.drive_id, d.devname, d.serial, d.model, bj.profile, bj.state, bj.operator, bj.created_at, bj.started_at, bj.finished_at, CAST( (julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 AS INTEGER ) AS duration_seconds, bj.error_text FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id ORDER BY bj.id DESC """) rows = await cur.fetchall() buf = io.StringIO() writer = csv.writer(buf) writer.writerow([ "job_id", "drive_id", "devname", "serial", "model", "profile", "state", "operator", "created_at", "started_at", "finished_at", "duration_seconds", "error_text", ]) for r in rows: writer.writerow(list(r)) buf.seek(0) return StreamingResponse( iter([buf.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=burnin_history.csv"}, ) # --------------------------------------------------------------------------- # On-demand email report # --------------------------------------------------------------------------- @router.post("/api/v1/report/send") async def send_report_now(): """Trigger the daily status email immediately (for testing SMTP config).""" if not settings.smtp_host: raise HTTPException(status_code=503, detail="SMTP not configured (SMTP_HOST is empty)") try: await mailer.send_report_now() except Exception as exc: raise HTTPException(status_code=502, detail=f"Mail send failed: {exc}") return {"sent": True, "to": settings.smtp_to} # --------------------------------------------------------------------------- # Drive notes / location update # --------------------------------------------------------------------------- @router.patch("/api/v1/drives/{drive_id}") async def update_drive( drive_id: int, req: UpdateDriveRequest, db: aiosqlite.Connection = Depends(get_db), ): cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,)) if not await cur.fetchone(): raise HTTPException(status_code=404, detail="Drive not found") await db.execute( "UPDATE drives SET notes=?, location=? WHERE id=?", (req.notes, req.location, drive_id), ) await db.commit() return {"updated": True} # --------------------------------------------------------------------------- # Audit log page # --------------------------------------------------------------------------- _AUDIT_QUERY = """ SELECT ae.id, ae.event_type, ae.operator, ae.message, ae.created_at, d.devname, d.serial FROM audit_events ae LEFT JOIN drives d ON d.id = ae.drive_id ORDER BY ae.id DESC LIMIT 200 """ _AUDIT_EVENT_COLORS = { "burnin_queued": "yellow", "burnin_started": "blue", "burnin_passed": "passed", "burnin_failed": "failed", "burnin_cancelled": "cancelled", "burnin_stuck": "failed", "burnin_unknown": "unknown", } @router.get("/audit", response_class=HTMLResponse) async def audit_log( request: Request, db: aiosqlite.Connection = Depends(get_db), ): cur = await db.execute(_AUDIT_QUERY) rows = [dict(r) for r in await cur.fetchall()] ps = poller.get_state() return templates.TemplateResponse("audit.html", { "request": request, "events": rows, "event_colors": _AUDIT_EVENT_COLORS, "poller": ps, **_stale_context(ps), }) # --------------------------------------------------------------------------- # Stats / analytics page # --------------------------------------------------------------------------- @router.get("/stats", response_class=HTMLResponse) async def stats_page( request: Request, db: aiosqlite.Connection = Depends(get_db), ): # Overall counts cur = await db.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN state='passed' THEN 1 ELSE 0 END) as passed, SUM(CASE WHEN state='failed' THEN 1 ELSE 0 END) as failed, SUM(CASE WHEN state='running' THEN 1 ELSE 0 END) as running, SUM(CASE WHEN state='cancelled' THEN 1 ELSE 0 END) as cancelled FROM burnin_jobs """) overall = dict(await cur.fetchone()) # Failure rate by drive model (only completed jobs) cur = await db.execute(""" SELECT COALESCE(d.model, 'Unknown') AS model, COUNT(*) AS total, SUM(CASE WHEN bj.state='passed' THEN 1 ELSE 0 END) AS passed, SUM(CASE WHEN bj.state='failed' THEN 1 ELSE 0 END) AS failed, ROUND(100.0 * SUM(CASE WHEN bj.state='passed' THEN 1 ELSE 0 END) / COUNT(*), 1) AS pass_rate FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id WHERE bj.state IN ('passed', 'failed') GROUP BY COALESCE(d.model, 'Unknown') ORDER BY total DESC LIMIT 20 """) by_model = [dict(r) for r in await cur.fetchall()] # Activity last 14 days cur = await db.execute(""" SELECT date(created_at) AS day, COUNT(*) AS total, SUM(CASE WHEN state='passed' THEN 1 ELSE 0 END) AS passed, SUM(CASE WHEN state='failed' THEN 1 ELSE 0 END) AS failed FROM burnin_jobs WHERE created_at >= date('now', '-14 days') GROUP BY date(created_at) ORDER BY day DESC """) by_day = [dict(r) for r in await cur.fetchall()] # Drives tracked cur = await db.execute("SELECT COUNT(*) FROM drives") drives_total = (await cur.fetchone())[0] ps = poller.get_state() return templates.TemplateResponse("stats.html", { "request": request, "overall": overall, "by_model": by_model, "by_day": by_day, "drives_total": drives_total, "poller": ps, **_stale_context(ps), }) # --------------------------------------------------------------------------- # Settings page # --------------------------------------------------------------------------- @router.get("/settings", response_class=HTMLResponse) async def settings_page( request: Request, db: aiosqlite.Connection = Depends(get_db), ): # Read-only display values (require container restart to change) readonly = { "truenas_base_url": settings.truenas_base_url, "truenas_verify_tls": settings.truenas_verify_tls, "poll_interval_seconds": settings.poll_interval_seconds, "stale_threshold_seconds": settings.stale_threshold_seconds, "allowed_ips": settings.allowed_ips or "(allow all)", "log_level": settings.log_level, } # Editable values — real values for form fields (password excluded) editable = { "smtp_host": settings.smtp_host, "smtp_port": settings.smtp_port, "smtp_ssl_mode": settings.smtp_ssl_mode or "starttls", "smtp_timeout": settings.smtp_timeout, "smtp_user": settings.smtp_user, "smtp_from": settings.smtp_from, "smtp_to": settings.smtp_to, "smtp_report_hour": settings.smtp_report_hour, "smtp_daily_report_enabled": settings.smtp_daily_report_enabled, "smtp_alert_on_fail": settings.smtp_alert_on_fail, "smtp_alert_on_pass": settings.smtp_alert_on_pass, "webhook_url": settings.webhook_url, "stuck_job_hours": settings.stuck_job_hours, "max_parallel_burnins": settings.max_parallel_burnins, } ps = poller.get_state() return templates.TemplateResponse("settings.html", { "request": request, "readonly": readonly, "editable": editable, "smtp_enabled": bool(settings.smtp_host), "poller": ps, **_stale_context(ps), }) @router.post("/api/v1/settings") async def save_settings(body: dict): """Save editable runtime settings. Password is only updated if non-empty.""" # Don't overwrite password if client sent empty string if "smtp_password" in body and body["smtp_password"] == "": del body["smtp_password"] try: saved = settings_store.save(body) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) return {"saved": True, "keys": saved} @router.post("/api/v1/settings/test-smtp") async def test_smtp(): """Test the current SMTP configuration without sending an email.""" result = await mailer.test_smtp_connection() if not result["ok"]: raise HTTPException(status_code=502, detail=result["error"]) return {"ok": True} # --------------------------------------------------------------------------- # Print view (must be BEFORE /{job_id} int route) # --------------------------------------------------------------------------- @router.get("/history/{job_id}/print", response_class=HTMLResponse) async def history_print( request: Request, job_id: int, db: aiosqlite.Connection = Depends(get_db), ): cur = await db.execute(""" SELECT bj.*, d.devname, d.serial, d.model, d.size_bytes, CAST( (julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 AS INTEGER ) AS duration_seconds FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id WHERE bj.id = ? """, (job_id,)) row = await cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Job not found") job = dict(row) cur = await db.execute(""" SELECT *, CAST( (julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER ) AS duration_seconds FROM burnin_stages WHERE burnin_job_id=? ORDER BY id """, (job_id,)) job["stages"] = [dict(r) for r in await cur.fetchall()] return templates.TemplateResponse("job_print.html", { "request": request, "job": job, }) # --------------------------------------------------------------------------- # Burn-in job detail API (must be after export.csv to avoid int coercion) # --------------------------------------------------------------------------- @router.get("/api/v1/burnin/{job_id}", response_model=BurninJobResponse) async def burnin_get(job_id: int, db: aiosqlite.Connection = Depends(get_db)): db.row_factory = aiosqlite.Row cur = await db.execute("SELECT * FROM burnin_jobs WHERE id=?", (job_id,)) row = await cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Burn-in job not found") cur = await db.execute( "SELECT * FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", (job_id,) ) stages = await cur.fetchall() return _row_to_burnin(row, stages)