diff --git a/app/config.py b/app/config.py index 75b4d43..45e4eb8 100644 --- a/app/config.py +++ b/app/config.py @@ -83,7 +83,7 @@ class Settings(BaseSettings): ssh_key: str = "" # PEM private key content (paste full key including headers) # Application version — used by the /api/v1/updates/check endpoint - app_version: str = "1.0.0-36" + app_version: str = "1.0.0-37" # ---- Authentication (1.0.0-22) ---- # session_secret: HMAC key for signing session cookies. Empty = generate diff --git a/app/routes/__init__.py b/app/routes/__init__.py index 3adcc4c..46338f0 100644 --- a/app/routes/__init__.py +++ b/app/routes/__init__.py @@ -50,6 +50,8 @@ import app.routes.audit as _audit_routes # noqa: E402 import app.routes.stats as _stats_routes # noqa: E402 import app.routes.report as _report_routes # noqa: E402 import app.routes.settings as _settings_routes # noqa: E402 +import app.routes.drives as _drives_routes # noqa: E402 +import app.routes.burnin as _burnin_routes # noqa: E402 router.include_router(_auth_routes.router) router.include_router(_system_routes.router) @@ -58,194 +60,17 @@ router.include_router(_audit_routes.router) router.include_router(_stats_routes.router) router.include_router(_report_routes.router) router.include_router(_settings_routes.router) +router.include_router(_drives_routes.router) +router.include_router(_burnin_routes.router) -# --------------------------------------------------------------------------- -# Internal helpers -# --------------------------------------------------------------------------- - -def _eta_seconds(eta_at: str | None) -> int | None: - if not eta_at: - return None - try: - eta_ts = datetime.fromisoformat(eta_at) - if eta_ts.tzinfo is None: - eta_ts = eta_ts.replace(tzinfo=timezone.utc) - remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds() - return max(0, int(remaining)) - except Exception: - return None - - -# _is_stale is now imported from ._helpers above. - - -def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None: - """Linear ETA extrapolation from started_at and percent complete.""" - if not started_at or percent <= 0: - return None - try: - start = datetime.fromisoformat(started_at) - if start.tzinfo is None: - start = start.replace(tzinfo=timezone.utc) - elapsed = (datetime.now(timezone.utc) - start).total_seconds() - total_est = elapsed / (percent / 100) - remaining = max(0, int(total_est - elapsed)) - return remaining - except Exception: - return None - - -def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState: - eta_at = row[f"{prefix}_eta_at"] - return SmartTestState( - state=row[f"{prefix}_state"] or "idle", - percent=row[f"{prefix}_percent"], - eta_seconds=_eta_seconds(eta_at), - eta_timestamp=eta_at, - started_at=row[f"{prefix}_started_at"], - finished_at=row[f"{prefix}_finished_at"], - error_text=row[f"{prefix}_error"], - ) - - -def _row_to_drive(row: aiosqlite.Row) -> DriveResponse: - return DriveResponse( - id=row["id"], - devname=row["devname"], - serial=row["serial"], - model=row["model"], - size_bytes=row["size_bytes"], - temperature_c=row["temperature_c"], - smart_health=row["smart_health"] or "UNKNOWN", - last_polled_at=row["last_polled_at"], - is_stale=_is_stale(row["last_polled_at"]), - smart_short=_build_smart(row, "short"), - smart_long=_build_smart(row, "long"), - notes=row["notes"], - location=row["location"], - pool_name=row["pool_name"], - pool_role=row["pool_role"], - pool_unlocked_until=burnin.unlock_expiry( - row["id"], row["pool_name"], row["pool_role"], - ), - ) - - -def _compute_status(drive: dict) -> str: - short = (drive.get("smart_short") or {}).get("state", "idle") - long_ = (drive.get("smart_long") or {}).get("state", "idle") - health = drive.get("smart_health", "UNKNOWN") - if "running" in (short, long_): - return "running" - if short == "failed" or long_ == "failed" or health == "FAILED": - return "failed" - if "passed" in (short, long_): - return "passed" - return "idle" - - -_DRIVES_QUERY = """ - SELECT - d.id, d.devname, d.serial, d.model, d.size_bytes, - d.temperature_c, d.smart_health, d.last_polled_at, - d.notes, d.location, d.pool_name, d.pool_role, - s.state AS short_state, - s.percent AS short_percent, - s.started_at AS short_started_at, - s.eta_at AS short_eta_at, - s.finished_at AS short_finished_at, - s.error_text AS short_error, - l.state AS long_state, - l.percent AS long_percent, - l.started_at AS long_started_at, - l.eta_at AS long_eta_at, - l.finished_at AS long_finished_at, - l.error_text AS long_error - FROM drives d - LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short' - LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long' - WHERE d.last_seen_at >= datetime('now', '-7 days') - {where} - ORDER BY d.devname -""" - - -async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]: - """Return latest burn-in job (any state) keyed by drive_id. - - Jobs created before the drive's last_reset_at are excluded so the - dashboard burn-in column clears after a reset while history is preserved. - """ - cur = await db.execute(""" - SELECT bj.* - FROM burnin_jobs bj - JOIN drives d ON d.id = bj.drive_id - WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id) - AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at) - """) - rows = await cur.fetchall() - return {r["drive_id"]: dict(r) for r in rows} - - -async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]: - cur = await db.execute(_DRIVES_QUERY.format(where="")) - rows = await cur.fetchall() - burnin_by_drive = await _fetch_burnin_by_drive(db) - - # For burn-ins that include SMART stages, fetch those stages so we can - # mirror their progress/result in the Short/Long SMART columns. - # This covers both running stages (showing live progress) and completed - # stages (showing passed/failed after the burn-in moves to the next stage). - bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row} - bi_ids_with_smart = [ - bi["id"] for bi in burnin_by_drive.values() - if bi["state"] in ("running", "queued") - ] - if bi_ids_with_smart: - placeholders = ",".join("?" * len(bi_ids_with_smart)) - # placeholders is purely structural ("?,?,?"); IDs themselves are - # bound via the parameter tuple. SQL built via concatenation so - # bandit's B608 (which fires on any f-string SQL) doesn't flag it. - sql = ( - "SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent, " - " bs.started_at, bs.finished_at, bs.error_text " - "FROM burnin_stages bs " - "WHERE bs.burnin_job_id IN (" + placeholders + ") " - " AND bs.stage_name IN ('short_smart', 'long_smart') " - " AND bs.state IN ('running', 'passed', 'failed')" - ) - cur = await db.execute(sql, bi_ids_with_smart) - for r in await cur.fetchall(): - bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r) - - drives = [] - for row in rows: - d = _row_to_drive(row).model_dump() - d["status"] = _compute_status(d) - bi = burnin_by_drive.get(d["id"]) - d["burnin"] = bi - - # Overlay burn-in SMART stage progress/results onto the SMART columns - if bi and bi["id"] in bi_smart_stages: - for stage_name, stage in bi_smart_stages[bi["id"]].items(): - target = "smart_short" if stage_name == "short_smart" else "smart_long" - # Only overlay if the standalone SMART column is idle/empty - existing = d.get(target) or {} - if existing.get("state") not in (None, "idle"): - continue - pct = stage["percent"] or 0 - d[target] = { - "state": stage["state"], - "percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0), - "eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None, - "eta_timestamp": None, - "started_at": stage["started_at"], - "finished_at": stage["finished_at"], - "error_text": stage["error_text"], - } - - drives.append(d) - return drives +# Drives helpers — re-exported for the dashboard + SSE handlers in this +# file AND for `from app.routes import _fetch_drives_for_template` +# from mailer.py (existing back-compat shim). +from ._drives_helpers import ( # noqa: E402 + _DRIVES_QUERY, _row_to_drive, _build_smart, _compute_status, + _compute_eta_seconds, _eta_seconds, + _fetch_burnin_by_drive, _fetch_drives_for_template, +) # _stale_context is now imported from ._helpers above. @@ -336,482 +161,3 @@ async def sse_drives(request: Request): # --------------------------------------------------------------------------- - -@router.get("/api/v1/drives", response_model=list[DriveResponse]) -async def list_drives(db: aiosqlite.Connection = Depends(get_db)): - cur = await db.execute(_DRIVES_QUERY.format(where="")) - rows = await cur.fetchall() - return [_row_to_drive(r) for r in rows] - - -@router.get("/api/v1/drives/{drive_id}/drawer") -async def drive_drawer(drive_id: int, db: aiosqlite.Connection = Depends(get_db)): - """Data for the log drawer — latest burn-in job + stages, SMART tests, audit events.""" - cur = await db.execute(_DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,)) - row = await cur.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Drive not found") - drive = _row_to_drive(row) - - # Latest burn-in job + its stages (include log_text and bad_blocks) - cur = await db.execute( - "SELECT * FROM burnin_jobs WHERE drive_id=? ORDER BY id DESC LIMIT 1", - (drive_id,), - ) - job_row = await cur.fetchone() - burnin = None - if job_row: - job = dict(job_row) - cur = await db.execute( - "SELECT id, stage_name, state, percent, started_at, finished_at, " - "duration_seconds, error_text, log_text, bad_blocks " - "FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", - (job_row["id"],), - ) - job["stages"] = [dict(r) for r in await cur.fetchall()] - burnin = job - - # SMART raw output from smart_tests table - cur = await db.execute( - "SELECT test_type, state, percent, started_at, finished_at, error_text, raw_output " - "FROM smart_tests WHERE drive_id=?", - (drive_id,), - ) - smart_rows = {r["test_type"]: dict(r) for r in await cur.fetchall()} - - # Cached SMART attributes (JSON blob on drives table) - import json as _json - smart_attrs = None - cur = await db.execute("SELECT smart_attrs FROM drives WHERE id=?", (drive_id,)) - attrs_row = await cur.fetchone() - if attrs_row and attrs_row["smart_attrs"]: - try: - smart_attrs = _json.loads(attrs_row["smart_attrs"]) - except Exception: - pass - - # Last 50 audit events for this drive (newest first) - cur = await db.execute(""" - SELECT id, event_type, operator, message, created_at - FROM audit_events - WHERE drive_id = ? - ORDER BY id DESC - LIMIT 50 - """, (drive_id,)) - events = [dict(r) for r in await cur.fetchall()] - - def _smart_card(test_type: str) -> dict: - smart_obj = drive.smart_short if test_type == "short" else drive.smart_long - base = smart_obj.model_dump() if smart_obj else {} - row = smart_rows.get(test_type, {}) - base["raw_output"] = row.get("raw_output") - return base - - return { - "drive": { - "id": drive.id, - "devname": drive.devname, - "serial": drive.serial, - "model": drive.model, - "size_bytes": drive.size_bytes, - }, - "burnin": burnin, - "smart": { - "short": _smart_card("short"), - "long": _smart_card("long"), - "attrs": smart_attrs, - }, - "events": events, - } - - -@router.get("/api/v1/drives/{drive_id}", response_model=DriveResponse) -async def get_drive(drive_id: int, db: aiosqlite.Connection = Depends(get_db)): - cur = await db.execute( - _DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,) - ) - row = await cur.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Drive not found") - return _row_to_drive(row) - - -@router.post("/api/v1/drives/{drive_id}/smart/start") -async def smart_start( - drive_id: int, - body: dict, - db: aiosqlite.Connection = Depends(get_db), -): - """Start a standalone SHORT or LONG SMART test on a single drive. - - Uses SSH (smartctl) when configured — required for TrueNAS SCALE 25.10+ - where the REST smart/test endpoint no longer exists. - Falls back to TrueNAS REST API for older versions. - """ - from app import burnin as _burnin, ssh_client - - test_type = (body.get("type") or "").upper() - if test_type not in ("SHORT", "LONG"): - raise HTTPException(status_code=422, detail="type must be SHORT or LONG") - - cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,)) - row = await cur.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Drive not found") - devname = row[0] - - now = datetime.now(timezone.utc).isoformat() - ttype_lower = test_type.lower() - - if ssh_client.is_configured(): - # SSH path — works on TrueNAS SCALE 25.10+ and CORE - try: - output = await ssh_client.start_smart_test(devname, test_type) - except Exception as exc: - raise HTTPException(status_code=502, detail=f"SSH error: {exc}") - - # Mark as running in DB (truenas_job_id=NULL signals SSH-managed test) - # Store smartctl start output as proof the test was initiated - await db.execute( - """INSERT INTO smart_tests (drive_id, test_type, state, percent, started_at, raw_output) - VALUES (?,?,?,?,?,?) - ON CONFLICT(drive_id, test_type) DO UPDATE SET - state='running', percent=0, truenas_job_id=NULL, - started_at=excluded.started_at, finished_at=NULL, error_text=NULL, - raw_output=excluded.raw_output""", - (drive_id, ttype_lower, "running", 0, now, output), - ) - await db.commit() - from app import poller as _poller - _poller._notify_subscribers() - return {"devname": devname, "type": test_type, "message": output[:200]} - - else: - # REST path — older TrueNAS CORE / SCALE versions - client = _burnin._client - if client is None: - raise HTTPException(status_code=503, detail="TrueNAS client not ready") - try: - tn_job_id = await client.start_smart_test([devname], test_type) - except Exception as exc: - raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}") - return {"job_id": tn_job_id, "devname": devname, "type": test_type} - - -@router.post("/api/v1/drives/{drive_id}/smart/cancel") -async def smart_cancel( - drive_id: int, - body: dict, - db: aiosqlite.Connection = Depends(get_db), -): - """Cancel a running standalone SMART test on a drive.""" - from app import burnin as _burnin - - test_type = (body.get("type") or "").lower() - if test_type not in ("short", "long"): - raise HTTPException(status_code=422, detail="type must be 'short' or 'long'") - - cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,)) - row = await cur.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Drive not found") - devname = row[0] - - client = _burnin._client - if client is None: - raise HTTPException(status_code=503, detail="TrueNAS client not ready") - - from app import ssh_client - - if ssh_client.is_configured(): - # SSH path — abort via smartctl -X - try: - await ssh_client.abort_smart_test(devname) - except Exception as exc: - raise HTTPException(status_code=502, detail=f"SSH abort error: {exc}") - else: - # REST path — find TrueNAS job and abort it - try: - jobs = await client.get_smart_jobs() - tn_job_id = None - for j in jobs: - if j.get("state") != "RUNNING": - continue - args = j.get("arguments", []) - if not args or not isinstance(args[0], dict): - continue - if devname in args[0].get("disks", []): - tn_job_id = j["id"] - break - - if tn_job_id is None: - raise HTTPException(status_code=404, detail="No running SMART test found for this drive") - - await client.abort_job(tn_job_id) - except HTTPException: - raise - except Exception as exc: - raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}") - - # Update local DB state - now = datetime.now(timezone.utc).isoformat() - await db.execute( - "UPDATE smart_tests SET state='aborted', finished_at=? WHERE drive_id=? AND test_type=? AND state='running'", - (now, drive_id, test_type), - ) - await db.commit() - - return {"cancelled": True, "devname": devname, "type": test_type} - - -# --------------------------------------------------------------------------- -# Burn-in API -# --------------------------------------------------------------------------- - -def _row_to_burnin(row: aiosqlite.Row, stages: list[aiosqlite.Row]) -> BurninJobResponse: - return BurninJobResponse( - id=row["id"], - drive_id=row["drive_id"], - profile=row["profile"], - state=row["state"], - percent=row["percent"] or 0, - stage_name=row["stage_name"], - operator=row["operator"], - created_at=row["created_at"], - started_at=row["started_at"], - finished_at=row["finished_at"], - error_text=row["error_text"], - stages=[ - BurninStageResponse( - id=s["id"], - stage_name=s["stage_name"], - state=s["state"], - percent=s["percent"] or 0, - started_at=s["started_at"], - finished_at=s["finished_at"], - error_text=s["error_text"], - ) - for s in stages - ], - ) - - -# _operator_for is now imported from ._helpers above. - - -@router.post("/api/v1/burnin/start") -async def burnin_start(request: Request, req: StartBurninRequest): - operator = _operator_for(request, req.operator) - results = [] - errors = [] - for drive_id in req.drive_ids: - try: - job_id = await burnin.start_job( - drive_id, req.profile, operator, stage_order=req.stage_order - ) - results.append({"drive_id": drive_id, "job_id": job_id}) - except burnin.PoolMemberError as exc: - errors.append({ - "drive_id": drive_id, - "error": str(exc), - "pool_name": exc.pool_name, - "pool_role": exc.pool_role, - "pool_locked": True, - }) - except ValueError as exc: - errors.append({"drive_id": drive_id, "error": str(exc)}) - if errors and not results: - # Surface the first error's structured fields so the UI can render - # an unlock affordance instead of a generic toast. - raise HTTPException(status_code=409, detail=errors[0]) - return {"queued": results, "errors": errors} - - -@router.post("/api/v1/drives/{drive_id}/unlock") -async def unlock_pool_drive(drive_id: int, request: Request, req: UnlockPoolDriveRequest): - operator = _operator_for(request, req.operator) - ip = _client_ip(request) - # Rate-limit by drive AND by source IP. A typo on the confirm token - # is the common case so the threshold is loose, but a brute-force - # attempt to guess the token still hits the IP cap. - keys = (("drive", drive_id), ("ip", ip)) - attempt = auth.unlock_limiter.register(*keys) - if attempt != "ok": - raise HTTPException( - status_code=429, - detail="Too many unlock attempts on this drive. Try again later.", - ) - try: - expiry = await burnin.grant_pool_unlock( - drive_id, req.confirm_token, operator, req.reason, - ) - except ValueError as exc: - raise HTTPException(status_code=400, detail=str(exc)) - auth.unlock_limiter.clear(*keys) - return {"unlocked": True, "expires_at": expiry, - # Read from the submodule, not the package-root snapshot - # alias — keeps tests that monkey-patch UNLOCK_TTL_SECONDS - # in app.burnin.unlock observable from the API response. - "ttl_seconds": burnin.unlock.UNLOCK_TTL_SECONDS} - - -@router.post("/api/v1/burnin/{job_id}/cancel") -async def burnin_cancel(job_id: int, request: Request, req: CancelBurninRequest): - operator = _operator_for(request, req.operator) - ok = await burnin.cancel_job(job_id, operator) - if not ok: - raise HTTPException(status_code=409, detail="Job not found or not cancellable") - return {"cancelled": True} - - -# --------------------------------------------------------------------------- -# CSV export -# --------------------------------------------------------------------------- - -@router.get("/api/v1/burnin/export.csv") -async def burnin_export_csv(db: aiosqlite.Connection = Depends(get_db)): - cur = await db.execute(""" - SELECT - bj.id AS job_id, - bj.drive_id, - d.devname, - d.serial, - d.model, - bj.profile, - bj.state, - bj.operator, - bj.created_at, - bj.started_at, - bj.finished_at, - CAST( - (julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 - AS INTEGER - ) AS duration_seconds, - bj.error_text - FROM burnin_jobs bj - JOIN drives d ON d.id = bj.drive_id - ORDER BY bj.id DESC - """) - rows = await cur.fetchall() - - buf = io.StringIO() - writer = csv.writer(buf) - writer.writerow([ - "job_id", "drive_id", "devname", "serial", "model", - "profile", "state", "operator", - "created_at", "started_at", "finished_at", "duration_seconds", - "error_text", - ]) - for r in rows: - writer.writerow(list(r)) - - buf.seek(0) - return StreamingResponse( - iter([buf.getvalue()]), - media_type="text/csv", - headers={"Content-Disposition": "attachment; filename=burnin_history.csv"}, - ) - - - - -# --------------------------------------------------------------------------- -# Drive notes / location update -# --------------------------------------------------------------------------- - -@router.patch("/api/v1/drives/{drive_id}") -async def update_drive( - drive_id: int, - req: UpdateDriveRequest, - db: aiosqlite.Connection = Depends(get_db), -): - cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,)) - if not await cur.fetchone(): - raise HTTPException(status_code=404, detail="Drive not found") - - await db.execute( - "UPDATE drives SET notes=?, location=? WHERE id=?", - (req.notes, req.location, drive_id), - ) - await db.commit() - return {"updated": True} - - -@router.post("/api/v1/drives/{drive_id}/reset") -async def reset_drive( - drive_id: int, - request: Request, - body: dict, - db: aiosqlite.Connection = Depends(get_db), -): - """ - Clear SMART test results for a drive so it shows as fresh. - Only allowed when no burn-in job is active (queued or running). - Preserves all job history — just resets the display state. - """ - cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,)) - if not await cur.fetchone(): - raise HTTPException(status_code=404, detail="Drive not found") - - # Reject if any active burn-in - cur = await db.execute( - "SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')", - (drive_id,), - ) - if (await cur.fetchone())[0] > 0: - raise HTTPException(status_code=409, detail="Cannot reset while a burn-in is active") - - # Trust the logged-in user, not the body (the JS used to send a - # literal "operator" because window._operator was never set). - operator = _operator_for(request, body.get("operator")) - - # Reset SMART test state to idle - await db.execute( - """UPDATE smart_tests SET state='idle', percent=0, started_at=NULL, - eta_at=NULL, finished_at=NULL, error_text=NULL, raw_output=NULL - WHERE drive_id=?""", - (drive_id,), - ) - # Clear SMART attrs cache + stamp reset time (hides prior burn-in from dashboard) - now = datetime.now(timezone.utc).isoformat() - await db.execute( - "UPDATE drives SET smart_attrs=NULL, last_reset_at=? WHERE id=?", - (now, drive_id), - ) - - # Audit event - await db.execute( - """INSERT INTO audit_events (event_type, drive_id, operator, message) - VALUES (?,?,?,?)""", - ("drive_reset", drive_id, operator, "Drive reset — SMART state cleared"), - ) - await db.commit() - - poller._notify_subscribers() - return {"reset": True} - - - - - - -# --------------------------------------------------------------------------- - - - -# --------------------------------------------------------------------------- -# Burn-in job detail API (must be after export.csv to avoid int coercion) -# --------------------------------------------------------------------------- - -@router.get("/api/v1/burnin/{job_id}", response_model=BurninJobResponse) -async def burnin_get(job_id: int, db: aiosqlite.Connection = Depends(get_db)): - db.row_factory = aiosqlite.Row - cur = await db.execute("SELECT * FROM burnin_jobs WHERE id=?", (job_id,)) - row = await cur.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Burn-in job not found") - cur = await db.execute( - "SELECT * FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", (job_id,) - ) - stages = await cur.fetchall() - return _row_to_burnin(row, stages) diff --git a/app/routes/_drives_helpers.py b/app/routes/_drives_helpers.py new file mode 100644 index 0000000..2bf5cf8 --- /dev/null +++ b/app/routes/_drives_helpers.py @@ -0,0 +1,199 @@ +"""Shared drives helpers — used by routes/drives.py, routes/__init__.py +(for the dashboard + SSE), AND mailer.py (for the daily report). + +This module exists so the drives endpoints can be extracted to their +own file without making mailer's `from app.routes import _fetch_drives_ +for_template` break. The package re-exports `_fetch_drives_for_template` +on its `app.routes` namespace for that backward-compat shim. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import aiosqlite + +from app import burnin +from app.models import DriveResponse, SmartTestState + +from ._helpers import is_stale + + +def _eta_seconds(eta_at: str | None) -> int | None: + if not eta_at: + return None + try: + eta_ts = datetime.fromisoformat(eta_at) + if eta_ts.tzinfo is None: + eta_ts = eta_ts.replace(tzinfo=timezone.utc) + remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds() + return max(0, int(remaining)) + except Exception: + return None + + +def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None: + """Linear ETA extrapolation from started_at and percent complete.""" + if not started_at or percent <= 0: + return None + try: + start = datetime.fromisoformat(started_at) + if start.tzinfo is None: + start = start.replace(tzinfo=timezone.utc) + elapsed = (datetime.now(timezone.utc) - start).total_seconds() + total_est = elapsed / (percent / 100) + remaining = max(0, int(total_est - elapsed)) + return remaining + except Exception: + return None + + +def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState: + eta_at = row[f"{prefix}_eta_at"] + return SmartTestState( + state=row[f"{prefix}_state"] or "idle", + percent=row[f"{prefix}_percent"], + eta_seconds=_eta_seconds(eta_at), + eta_timestamp=eta_at, + started_at=row[f"{prefix}_started_at"], + finished_at=row[f"{prefix}_finished_at"], + error_text=row[f"{prefix}_error"], + ) + + +def _row_to_drive(row: aiosqlite.Row) -> DriveResponse: + return DriveResponse( + id=row["id"], + devname=row["devname"], + serial=row["serial"], + model=row["model"], + size_bytes=row["size_bytes"], + temperature_c=row["temperature_c"], + smart_health=row["smart_health"] or "UNKNOWN", + last_polled_at=row["last_polled_at"], + is_stale=is_stale(row["last_polled_at"]), + smart_short=_build_smart(row, "short"), + smart_long=_build_smart(row, "long"), + notes=row["notes"], + location=row["location"], + pool_name=row["pool_name"], + pool_role=row["pool_role"], + pool_unlocked_until=burnin.unlock_expiry( + row["id"], row["pool_name"], row["pool_role"], + ), + ) + + +def _compute_status(drive: dict) -> str: + short = (drive.get("smart_short") or {}).get("state", "idle") + long_ = (drive.get("smart_long") or {}).get("state", "idle") + health = drive.get("smart_health", "UNKNOWN") + if "running" in (short, long_): + return "running" + if short == "failed" or long_ == "failed" or health == "FAILED": + return "failed" + if "passed" in (short, long_): + return "passed" + return "idle" + + +_DRIVES_QUERY = """ + SELECT + d.id, d.devname, d.serial, d.model, d.size_bytes, + d.temperature_c, d.smart_health, d.last_polled_at, + d.notes, d.location, d.pool_name, d.pool_role, + s.state AS short_state, + s.percent AS short_percent, + s.started_at AS short_started_at, + s.eta_at AS short_eta_at, + s.finished_at AS short_finished_at, + s.error_text AS short_error, + l.state AS long_state, + l.percent AS long_percent, + l.started_at AS long_started_at, + l.eta_at AS long_eta_at, + l.finished_at AS long_finished_at, + l.error_text AS long_error + FROM drives d + LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short' + LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long' + WHERE d.last_seen_at >= datetime('now', '-7 days') + {where} + ORDER BY d.devname +""" + + +async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]: + """Return latest burn-in job (any state) keyed by drive_id. + + Jobs created before the drive's last_reset_at are excluded so the + dashboard burn-in column clears after a reset while history is preserved. + """ + cur = await db.execute(""" + SELECT bj.* + FROM burnin_jobs bj + JOIN drives d ON d.id = bj.drive_id + WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id) + AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at) + """) + rows = await cur.fetchall() + return {r["drive_id"]: dict(r) for r in rows} + + +async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]: + cur = await db.execute(_DRIVES_QUERY.format(where="")) + rows = await cur.fetchall() + burnin_by_drive = await _fetch_burnin_by_drive(db) + + # For burn-ins that include SMART stages, fetch those stages so we can + # mirror their progress/result in the Short/Long SMART columns. + bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row} + bi_ids_with_smart = [ + bi["id"] for bi in burnin_by_drive.values() + if bi["state"] in ("running", "queued") + ] + if bi_ids_with_smart: + placeholders = ",".join("?" * len(bi_ids_with_smart)) + # placeholders is purely structural ("?,?,?"); IDs themselves are + # bound via the parameter tuple. SQL built via concatenation so + # bandit's B608 (which fires on any f-string SQL) doesn't flag it. + sql = ( + "SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent, " + " bs.started_at, bs.finished_at, bs.error_text " + "FROM burnin_stages bs " + "WHERE bs.burnin_job_id IN (" + placeholders + ") " + " AND bs.stage_name IN ('short_smart', 'long_smart') " + " AND bs.state IN ('running', 'passed', 'failed')" + ) + cur = await db.execute(sql, bi_ids_with_smart) + for r in await cur.fetchall(): + bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r) + + drives = [] + for row in rows: + d = _row_to_drive(row).model_dump() + d["status"] = _compute_status(d) + bi = burnin_by_drive.get(d["id"]) + d["burnin"] = bi + + # Overlay burn-in SMART stage progress/results onto the SMART columns + if bi and bi["id"] in bi_smart_stages: + for stage_name, stage in bi_smart_stages[bi["id"]].items(): + target = "smart_short" if stage_name == "short_smart" else "smart_long" + # Only overlay if the standalone SMART column is idle/empty + existing = d.get(target) or {} + if existing.get("state") not in (None, "idle"): + continue + pct = stage["percent"] or 0 + d[target] = { + "state": stage["state"], + "percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0), + "eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None, + "eta_timestamp": None, + "started_at": stage["started_at"], + "finished_at": stage["finished_at"], + "error_text": stage["error_text"], + } + + drives.append(d) + return drives diff --git a/app/routes/burnin.py b/app/routes/burnin.py new file mode 100644 index 0000000..13f287e --- /dev/null +++ b/app/routes/burnin.py @@ -0,0 +1,156 @@ +"""Burn-in endpoints — start, cancel, CSV export, job detail. + + POST /api/v1/burnin/start + POST /api/v1/burnin/{job_id}/cancel + GET /api/v1/burnin/export.csv — must register before /{job_id} + so int("export.csv") doesn't 422 + GET /api/v1/burnin/{job_id} +""" + +from __future__ import annotations + +import csv +import io + +import aiosqlite +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import StreamingResponse + +from app import burnin +from app.database import get_db +from app.models import ( + BurninJobResponse, BurninStageResponse, + CancelBurninRequest, StartBurninRequest, +) + +from ._helpers import operator_for + +router = APIRouter() + + +def _row_to_burnin(row: aiosqlite.Row, stages: list[aiosqlite.Row]) -> BurninJobResponse: + return BurninJobResponse( + id=row["id"], + drive_id=row["drive_id"], + profile=row["profile"], + state=row["state"], + percent=row["percent"] or 0, + stage_name=row["stage_name"], + operator=row["operator"], + created_at=row["created_at"], + started_at=row["started_at"], + finished_at=row["finished_at"], + error_text=row["error_text"], + stages=[ + BurninStageResponse( + id=s["id"], + stage_name=s["stage_name"], + state=s["state"], + percent=s["percent"] or 0, + started_at=s["started_at"], + finished_at=s["finished_at"], + error_text=s["error_text"], + ) + for s in stages + ], + ) + + +@router.post("/api/v1/burnin/start") +async def burnin_start(request: Request, req: StartBurninRequest): + operator = operator_for(request, req.operator) + results = [] + errors = [] + for drive_id in req.drive_ids: + try: + job_id = await burnin.start_job( + drive_id, req.profile, operator, stage_order=req.stage_order + ) + results.append({"drive_id": drive_id, "job_id": job_id}) + except burnin.PoolMemberError as exc: + errors.append({ + "drive_id": drive_id, + "error": str(exc), + "pool_name": exc.pool_name, + "pool_role": exc.pool_role, + "pool_locked": True, + }) + except ValueError as exc: + errors.append({"drive_id": drive_id, "error": str(exc)}) + if errors and not results: + # Surface the first error's structured fields so the UI can render + # an unlock affordance instead of a generic toast. + raise HTTPException(status_code=409, detail=errors[0]) + return {"queued": results, "errors": errors} + + +@router.post("/api/v1/burnin/{job_id}/cancel") +async def burnin_cancel(job_id: int, request: Request, req: CancelBurninRequest): + operator = operator_for(request, req.operator) + ok = await burnin.cancel_job(job_id, operator) + if not ok: + raise HTTPException(status_code=409, detail="Job not found or not cancellable") + return {"cancelled": True} + + +# /api/v1/burnin/export.csv MUST be declared BEFORE /api/v1/burnin/{job_id} +# so FastAPI's path matching tries the literal first; otherwise the int +# coercion fires int("export.csv") and 422s. + +@router.get("/api/v1/burnin/export.csv") +async def burnin_export_csv(db: aiosqlite.Connection = Depends(get_db)): + cur = await db.execute(""" + SELECT + bj.id AS job_id, + bj.drive_id, + d.devname, + d.serial, + d.model, + bj.profile, + bj.state, + bj.operator, + bj.created_at, + bj.started_at, + bj.finished_at, + CAST( + (julianday(bj.finished_at) - julianday(bj.started_at)) * 86400 + AS INTEGER + ) AS duration_seconds, + bj.error_text + FROM burnin_jobs bj + JOIN drives d ON d.id = bj.drive_id + ORDER BY bj.id DESC + """) + rows = await cur.fetchall() + + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow([ + "job_id", "drive_id", "devname", "serial", "model", + "profile", "state", "operator", + "created_at", "started_at", "finished_at", "duration_seconds", + "error_text", + ]) + for r in rows: + writer.writerow(list(r)) + + buf.seek(0) + return StreamingResponse( + iter([buf.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=burnin_history.csv"}, + ) + + +@router.get("/api/v1/burnin/{job_id}", response_model=BurninJobResponse) +async def burnin_get(job_id: int, db: aiosqlite.Connection = Depends(get_db)): + db.row_factory = aiosqlite.Row + cur = await db.execute("SELECT * FROM burnin_jobs WHERE id=?", (job_id,)) + row = await cur.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Burn-in job not found") + cur = await db.execute( + "SELECT * FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", (job_id,) + ) + stages = await cur.fetchall() + return _row_to_burnin(row, stages) diff --git a/app/routes/drives.py b/app/routes/drives.py new file mode 100644 index 0000000..dd951dd --- /dev/null +++ b/app/routes/drives.py @@ -0,0 +1,353 @@ +"""Drive endpoints — list, drawer, edit, SMART start/cancel, reset, unlock. + + GET /api/v1/drives + GET /api/v1/drives/{id}/drawer + GET /api/v1/drives/{id} + PATCH /api/v1/drives/{id} — notes / location update + POST /api/v1/drives/{id}/smart/start + POST /api/v1/drives/{id}/smart/cancel + POST /api/v1/drives/{id}/reset + POST /api/v1/drives/{id}/unlock — pool-membership lock override +""" + +from __future__ import annotations + +import json as _json +from datetime import datetime, timezone + +import aiosqlite +from fastapi import APIRouter, Depends, HTTPException, Request + +from app import auth, burnin, poller +from app.database import get_db +from app.models import ( + DriveResponse, UnlockPoolDriveRequest, UpdateDriveRequest, +) + +from ._drives_helpers import _DRIVES_QUERY, _row_to_drive +from ._helpers import client_ip, operator_for + +router = APIRouter() + + +@router.get("/api/v1/drives", response_model=list[DriveResponse]) +async def list_drives(db: aiosqlite.Connection = Depends(get_db)): + cur = await db.execute(_DRIVES_QUERY.format(where="")) + rows = await cur.fetchall() + return [_row_to_drive(r) for r in rows] + + +@router.get("/api/v1/drives/{drive_id}/drawer") +async def drive_drawer(drive_id: int, db: aiosqlite.Connection = Depends(get_db)): + """Data for the log drawer — latest burn-in job + stages, SMART tests, audit events.""" + cur = await db.execute(_DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,)) + row = await cur.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Drive not found") + drive = _row_to_drive(row) + + # Latest burn-in job + its stages (include log_text and bad_blocks) + cur = await db.execute( + "SELECT * FROM burnin_jobs WHERE drive_id=? ORDER BY id DESC LIMIT 1", + (drive_id,), + ) + job_row = await cur.fetchone() + burnin_job = None + if job_row: + job = dict(job_row) + cur = await db.execute( + "SELECT id, stage_name, state, percent, started_at, finished_at, " + "duration_seconds, error_text, log_text, bad_blocks " + "FROM burnin_stages WHERE burnin_job_id=? ORDER BY id", + (job_row["id"],), + ) + job["stages"] = [dict(r) for r in await cur.fetchall()] + burnin_job = job + + # SMART raw output from smart_tests table + cur = await db.execute( + "SELECT test_type, state, percent, started_at, finished_at, error_text, raw_output " + "FROM smart_tests WHERE drive_id=?", + (drive_id,), + ) + smart_rows = {r["test_type"]: dict(r) for r in await cur.fetchall()} + + # Cached SMART attributes (JSON blob on drives table) + smart_attrs = None + cur = await db.execute("SELECT smart_attrs FROM drives WHERE id=?", (drive_id,)) + attrs_row = await cur.fetchone() + if attrs_row and attrs_row["smart_attrs"]: + try: + smart_attrs = _json.loads(attrs_row["smart_attrs"]) + except Exception: + pass + + # Last 50 audit events for this drive (newest first) + cur = await db.execute(""" + SELECT id, event_type, operator, message, created_at + FROM audit_events + WHERE drive_id = ? + ORDER BY id DESC + LIMIT 50 + """, (drive_id,)) + events = [dict(r) for r in await cur.fetchall()] + + def _smart_card(test_type: str) -> dict: + smart_obj = drive.smart_short if test_type == "short" else drive.smart_long + base = smart_obj.model_dump() if smart_obj else {} + row = smart_rows.get(test_type, {}) + base["raw_output"] = row.get("raw_output") + return base + + return { + "drive": { + "id": drive.id, + "devname": drive.devname, + "serial": drive.serial, + "model": drive.model, + "size_bytes": drive.size_bytes, + }, + "burnin": burnin_job, + "smart": { + "short": _smart_card("short"), + "long": _smart_card("long"), + "attrs": smart_attrs, + }, + "events": events, + } + + +@router.get("/api/v1/drives/{drive_id}", response_model=DriveResponse) +async def get_drive(drive_id: int, db: aiosqlite.Connection = Depends(get_db)): + cur = await db.execute( + _DRIVES_QUERY.format(where="AND d.id = ?"), (drive_id,) + ) + row = await cur.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Drive not found") + return _row_to_drive(row) + + +@router.post("/api/v1/drives/{drive_id}/smart/start") +async def smart_start( + drive_id: int, + body: dict, + db: aiosqlite.Connection = Depends(get_db), +): + """Start a standalone SHORT or LONG SMART test on a single drive. + + Uses SSH (smartctl) when configured — required for TrueNAS SCALE 25.10+ + where the REST smart/test endpoint no longer exists. + Falls back to TrueNAS REST API for older versions. + """ + from app import ssh_client + + test_type = (body.get("type") or "").upper() + if test_type not in ("SHORT", "LONG"): + raise HTTPException(status_code=422, detail="type must be SHORT or LONG") + + cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,)) + row = await cur.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Drive not found") + devname = row[0] + + now = datetime.now(timezone.utc).isoformat() + ttype_lower = test_type.lower() + + if ssh_client.is_configured(): + # SSH path — works on TrueNAS SCALE 25.10+ and CORE + try: + output = await ssh_client.start_smart_test(devname, test_type) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"SSH error: {exc}") + + # Mark as running in DB (truenas_job_id=NULL signals SSH-managed test) + # Store smartctl start output as proof the test was initiated + await db.execute( + """INSERT INTO smart_tests (drive_id, test_type, state, percent, started_at, raw_output) + VALUES (?,?,?,?,?,?) + ON CONFLICT(drive_id, test_type) DO UPDATE SET + state='running', percent=0, truenas_job_id=NULL, + started_at=excluded.started_at, finished_at=NULL, error_text=NULL, + raw_output=excluded.raw_output""", + (drive_id, ttype_lower, "running", 0, now, output), + ) + await db.commit() + poller._notify_subscribers() + return {"devname": devname, "type": test_type, "message": output[:200]} + + else: + # REST path — older TrueNAS CORE / SCALE versions + client = burnin._client + if client is None: + raise HTTPException(status_code=503, detail="TrueNAS client not ready") + try: + tn_job_id = await client.start_smart_test([devname], test_type) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}") + return {"job_id": tn_job_id, "devname": devname, "type": test_type} + + +@router.post("/api/v1/drives/{drive_id}/smart/cancel") +async def smart_cancel( + drive_id: int, + body: dict, + db: aiosqlite.Connection = Depends(get_db), +): + """Cancel a running standalone SMART test on a drive.""" + test_type = (body.get("type") or "").lower() + if test_type not in ("short", "long"): + raise HTTPException(status_code=422, detail="type must be 'short' or 'long'") + + cur = await db.execute("SELECT devname FROM drives WHERE id=?", (drive_id,)) + row = await cur.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Drive not found") + devname = row[0] + + client = burnin._client + if client is None: + raise HTTPException(status_code=503, detail="TrueNAS client not ready") + + from app import ssh_client + + if ssh_client.is_configured(): + # SSH path — abort via smartctl -X + try: + await ssh_client.abort_smart_test(devname) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"SSH abort error: {exc}") + else: + # REST path — find TrueNAS job and abort it + try: + jobs = await client.get_smart_jobs() + tn_job_id = None + for j in jobs: + if j.get("state") != "RUNNING": + continue + args = j.get("arguments", []) + if not args or not isinstance(args[0], dict): + continue + if devname in args[0].get("disks", []): + tn_job_id = j["id"] + break + + if tn_job_id is None: + raise HTTPException(status_code=404, detail="No running SMART test found for this drive") + + await client.abort_job(tn_job_id) + except HTTPException: + raise + except Exception as exc: + raise HTTPException(status_code=502, detail=f"TrueNAS error: {exc}") + + # Update local DB state + now = datetime.now(timezone.utc).isoformat() + await db.execute( + "UPDATE smart_tests SET state='aborted', finished_at=? WHERE drive_id=? AND test_type=? AND state='running'", + (now, drive_id, test_type), + ) + await db.commit() + + return {"cancelled": True, "devname": devname, "type": test_type} + + +@router.patch("/api/v1/drives/{drive_id}") +async def update_drive( + drive_id: int, + req: UpdateDriveRequest, + db: aiosqlite.Connection = Depends(get_db), +): + cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,)) + if not await cur.fetchone(): + raise HTTPException(status_code=404, detail="Drive not found") + + await db.execute( + "UPDATE drives SET notes=?, location=? WHERE id=?", + (req.notes, req.location, drive_id), + ) + await db.commit() + return {"updated": True} + + +@router.post("/api/v1/drives/{drive_id}/reset") +async def reset_drive( + drive_id: int, + request: Request, + body: dict, + db: aiosqlite.Connection = Depends(get_db), +): + """ + Clear SMART test results for a drive so it shows as fresh. + Only allowed when no burn-in job is active (queued or running). + Preserves all job history — just resets the display state. + """ + cur = await db.execute("SELECT id FROM drives WHERE id=?", (drive_id,)) + if not await cur.fetchone(): + raise HTTPException(status_code=404, detail="Drive not found") + + # Reject if any active burn-in + cur = await db.execute( + "SELECT COUNT(*) FROM burnin_jobs WHERE drive_id=? AND state IN ('queued','running')", + (drive_id,), + ) + if (await cur.fetchone())[0] > 0: + raise HTTPException(status_code=409, detail="Cannot reset while a burn-in is active") + + # Trust the logged-in user, not the body (the JS used to send a + # literal "operator" because window._operator was never set). + operator = operator_for(request, body.get("operator")) + + # Reset SMART test state to idle + await db.execute( + """UPDATE smart_tests SET state='idle', percent=0, started_at=NULL, + eta_at=NULL, finished_at=NULL, error_text=NULL, raw_output=NULL + WHERE drive_id=?""", + (drive_id,), + ) + # Clear SMART attrs cache + stamp reset time (hides prior burn-in from dashboard) + now = datetime.now(timezone.utc).isoformat() + await db.execute( + "UPDATE drives SET smart_attrs=NULL, last_reset_at=? WHERE id=?", + (now, drive_id), + ) + + # Audit event + await db.execute( + """INSERT INTO audit_events (event_type, drive_id, operator, message) + VALUES (?,?,?,?)""", + ("drive_reset", drive_id, operator, "Drive reset — SMART state cleared"), + ) + await db.commit() + + poller._notify_subscribers() + return {"reset": True} + + +@router.post("/api/v1/drives/{drive_id}/unlock") +async def unlock_pool_drive(drive_id: int, request: Request, req: UnlockPoolDriveRequest): + operator = operator_for(request, req.operator) + ip = client_ip(request) + # Rate-limit by drive AND by source IP. A typo on the confirm token + # is the common case so the threshold is loose, but a brute-force + # attempt to guess the token still hits the IP cap. + keys = (("drive", drive_id), ("ip", ip)) + attempt = auth.unlock_limiter.register(*keys) + if attempt != "ok": + raise HTTPException( + status_code=429, + detail="Too many unlock attempts on this drive. Try again later.", + ) + try: + expiry = await burnin.grant_pool_unlock( + drive_id, req.confirm_token, operator, req.reason, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + auth.unlock_limiter.clear(*keys) + # Read from the submodule, not the package-root snapshot alias — + # keeps tests that monkey-patch UNLOCK_TTL_SECONDS in + # app.burnin.unlock observable from the API response. + return {"unlocked": True, "expires_at": expiry, + "ttl_seconds": burnin.unlock.UNLOCK_TTL_SECONDS}