"""Shared drives helpers — used by routes/drives.py, routes/__init__.py (for the dashboard + SSE), AND mailer.py (for the daily report). This module exists so the drives endpoints can be extracted to their own file without making mailer's `from app.routes import _fetch_drives_ for_template` break. The package re-exports `_fetch_drives_for_template` on its `app.routes` namespace for that backward-compat shim. """ from __future__ import annotations from datetime import datetime, timezone import aiosqlite from app import burnin from app.models import DriveResponse, SmartTestState from ._helpers import is_stale def _eta_seconds(eta_at: str | None) -> int | None: if not eta_at: return None try: eta_ts = datetime.fromisoformat(eta_at) if eta_ts.tzinfo is None: eta_ts = eta_ts.replace(tzinfo=timezone.utc) remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds() return max(0, int(remaining)) except Exception: return None def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None: """Linear ETA extrapolation from started_at and percent complete.""" if not started_at or percent <= 0: return None try: start = datetime.fromisoformat(started_at) if start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) elapsed = (datetime.now(timezone.utc) - start).total_seconds() total_est = elapsed / (percent / 100) remaining = max(0, int(total_est - elapsed)) return remaining except Exception: return None def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState: eta_at = row[f"{prefix}_eta_at"] return SmartTestState( state=row[f"{prefix}_state"] or "idle", percent=row[f"{prefix}_percent"], eta_seconds=_eta_seconds(eta_at), eta_timestamp=eta_at, started_at=row[f"{prefix}_started_at"], finished_at=row[f"{prefix}_finished_at"], error_text=row[f"{prefix}_error"], ) def _row_to_drive(row: aiosqlite.Row) -> DriveResponse: return DriveResponse( id=row["id"], devname=row["devname"], serial=row["serial"], model=row["model"], size_bytes=row["size_bytes"], temperature_c=row["temperature_c"], smart_health=row["smart_health"] or "UNKNOWN", last_polled_at=row["last_polled_at"], is_stale=is_stale(row["last_polled_at"]), smart_short=_build_smart(row, "short"), smart_long=_build_smart(row, "long"), notes=row["notes"], location=row["location"], pool_name=row["pool_name"], pool_role=row["pool_role"], pool_unlocked_until=burnin.unlock_expiry( row["id"], row["pool_name"], row["pool_role"], ), ) def _compute_status(drive: dict) -> str: short = (drive.get("smart_short") or {}).get("state", "idle") long_ = (drive.get("smart_long") or {}).get("state", "idle") health = drive.get("smart_health", "UNKNOWN") if "running" in (short, long_): return "running" if short == "failed" or long_ == "failed" or health == "FAILED": return "failed" if "passed" in (short, long_): return "passed" return "idle" _DRIVES_QUERY = """ SELECT d.id, d.devname, d.serial, d.model, d.size_bytes, d.temperature_c, d.smart_health, d.last_polled_at, d.notes, d.location, d.pool_name, d.pool_role, s.state AS short_state, s.percent AS short_percent, s.started_at AS short_started_at, s.eta_at AS short_eta_at, s.finished_at AS short_finished_at, s.error_text AS short_error, l.state AS long_state, l.percent AS long_percent, l.started_at AS long_started_at, l.eta_at AS long_eta_at, l.finished_at AS long_finished_at, l.error_text AS long_error FROM drives d LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short' LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long' WHERE d.last_seen_at >= datetime('now', '-7 days') {where} ORDER BY d.devname """ async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]: """Return latest burn-in job (any state) keyed by drive_id. Jobs created before the drive's last_reset_at are excluded so the dashboard burn-in column clears after a reset while history is preserved. """ cur = await db.execute(""" SELECT bj.* FROM burnin_jobs bj JOIN drives d ON d.id = bj.drive_id WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id) AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at) """) rows = await cur.fetchall() return {r["drive_id"]: dict(r) for r in rows} async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]: cur = await db.execute(_DRIVES_QUERY.format(where="")) rows = await cur.fetchall() burnin_by_drive = await _fetch_burnin_by_drive(db) # For burn-ins that include SMART stages, fetch those stages so we can # mirror their progress/result in the Short/Long SMART columns. bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row} bi_ids_with_smart = [ bi["id"] for bi in burnin_by_drive.values() if bi["state"] in ("running", "queued") ] if bi_ids_with_smart: placeholders = ",".join("?" * len(bi_ids_with_smart)) # placeholders is purely structural ("?,?,?"); IDs themselves are # bound via the parameter tuple. SQL built via concatenation so # bandit's B608 (which fires on any f-string SQL) doesn't flag it. sql = ( "SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent, " " bs.started_at, bs.finished_at, bs.error_text " "FROM burnin_stages bs " "WHERE bs.burnin_job_id IN (" + placeholders + ") " " AND bs.stage_name IN ('short_smart', 'long_smart') " " AND bs.state IN ('running', 'passed', 'failed')" ) cur = await db.execute(sql, bi_ids_with_smart) for r in await cur.fetchall(): bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r) drives = [] for row in rows: d = _row_to_drive(row).model_dump() d["status"] = _compute_status(d) bi = burnin_by_drive.get(d["id"]) d["burnin"] = bi # Overlay burn-in SMART stage progress/results onto the SMART columns if bi and bi["id"] in bi_smart_stages: for stage_name, stage in bi_smart_stages[bi["id"]].items(): target = "smart_short" if stage_name == "short_smart" else "smart_long" # Only overlay if the standalone SMART column is idle/empty existing = d.get(target) or {} if existing.get("state") not in (None, "idle"): continue pct = stage["percent"] or 0 d[target] = { "state": stage["state"], "percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0), "eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None, "eta_timestamp": None, "started_at": stage["started_at"], "finished_at": stage["finished_at"], "error_text": stage["error_text"], } drives.append(d) return drives