nas-burnin/app/routes/_drives_helpers.py
Brandon Walter 40dac9090d refactor: extract drives + burnin routes (1.0.0-37)
Largest routes/ slice yet — drives.py (8 endpoints) and burnin.py
(4 endpoints). Drives helpers live in _drives_helpers.py so the
dashboard SSE handler in routes/__init__.py and mailer.py can both
keep using them via re-export.

routes/__init__.py shrinks from 815 → 163 LoC; only the dashboard /
and /sse/drives stream remain there. Routes split is now functionally
complete: 12 files, ~1800 LoC distributed by feature.
2026-05-03 09:59:15 -04:00

199 lines
7.5 KiB
Python

"""Shared drives helpers — used by routes/drives.py, routes/__init__.py
(for the dashboard + SSE), AND mailer.py (for the daily report).
This module exists so the drives endpoints can be extracted to their
own file without making mailer's `from app.routes import _fetch_drives_
for_template` break. The package re-exports `_fetch_drives_for_template`
on its `app.routes` namespace for that backward-compat shim.
"""
from __future__ import annotations
from datetime import datetime, timezone
import aiosqlite
from app import burnin
from app.models import DriveResponse, SmartTestState
from ._helpers import is_stale
def _eta_seconds(eta_at: str | None) -> int | None:
if not eta_at:
return None
try:
eta_ts = datetime.fromisoformat(eta_at)
if eta_ts.tzinfo is None:
eta_ts = eta_ts.replace(tzinfo=timezone.utc)
remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds()
return max(0, int(remaining))
except Exception:
return None
def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None:
"""Linear ETA extrapolation from started_at and percent complete."""
if not started_at or percent <= 0:
return None
try:
start = datetime.fromisoformat(started_at)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
total_est = elapsed / (percent / 100)
remaining = max(0, int(total_est - elapsed))
return remaining
except Exception:
return None
def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState:
eta_at = row[f"{prefix}_eta_at"]
return SmartTestState(
state=row[f"{prefix}_state"] or "idle",
percent=row[f"{prefix}_percent"],
eta_seconds=_eta_seconds(eta_at),
eta_timestamp=eta_at,
started_at=row[f"{prefix}_started_at"],
finished_at=row[f"{prefix}_finished_at"],
error_text=row[f"{prefix}_error"],
)
def _row_to_drive(row: aiosqlite.Row) -> DriveResponse:
return DriveResponse(
id=row["id"],
devname=row["devname"],
serial=row["serial"],
model=row["model"],
size_bytes=row["size_bytes"],
temperature_c=row["temperature_c"],
smart_health=row["smart_health"] or "UNKNOWN",
last_polled_at=row["last_polled_at"],
is_stale=is_stale(row["last_polled_at"]),
smart_short=_build_smart(row, "short"),
smart_long=_build_smart(row, "long"),
notes=row["notes"],
location=row["location"],
pool_name=row["pool_name"],
pool_role=row["pool_role"],
pool_unlocked_until=burnin.unlock_expiry(
row["id"], row["pool_name"], row["pool_role"],
),
)
def _compute_status(drive: dict) -> str:
short = (drive.get("smart_short") or {}).get("state", "idle")
long_ = (drive.get("smart_long") or {}).get("state", "idle")
health = drive.get("smart_health", "UNKNOWN")
if "running" in (short, long_):
return "running"
if short == "failed" or long_ == "failed" or health == "FAILED":
return "failed"
if "passed" in (short, long_):
return "passed"
return "idle"
_DRIVES_QUERY = """
SELECT
d.id, d.devname, d.serial, d.model, d.size_bytes,
d.temperature_c, d.smart_health, d.last_polled_at,
d.notes, d.location, d.pool_name, d.pool_role,
s.state AS short_state,
s.percent AS short_percent,
s.started_at AS short_started_at,
s.eta_at AS short_eta_at,
s.finished_at AS short_finished_at,
s.error_text AS short_error,
l.state AS long_state,
l.percent AS long_percent,
l.started_at AS long_started_at,
l.eta_at AS long_eta_at,
l.finished_at AS long_finished_at,
l.error_text AS long_error
FROM drives d
LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short'
LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long'
WHERE d.last_seen_at >= datetime('now', '-7 days')
{where}
ORDER BY d.devname
"""
async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]:
"""Return latest burn-in job (any state) keyed by drive_id.
Jobs created before the drive's last_reset_at are excluded so the
dashboard burn-in column clears after a reset while history is preserved.
"""
cur = await db.execute("""
SELECT bj.*
FROM burnin_jobs bj
JOIN drives d ON d.id = bj.drive_id
WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id)
AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at)
""")
rows = await cur.fetchall()
return {r["drive_id"]: dict(r) for r in rows}
async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]:
cur = await db.execute(_DRIVES_QUERY.format(where=""))
rows = await cur.fetchall()
burnin_by_drive = await _fetch_burnin_by_drive(db)
# For burn-ins that include SMART stages, fetch those stages so we can
# mirror their progress/result in the Short/Long SMART columns.
bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row}
bi_ids_with_smart = [
bi["id"] for bi in burnin_by_drive.values()
if bi["state"] in ("running", "queued")
]
if bi_ids_with_smart:
placeholders = ",".join("?" * len(bi_ids_with_smart))
# placeholders is purely structural ("?,?,?"); IDs themselves are
# bound via the parameter tuple. SQL built via concatenation so
# bandit's B608 (which fires on any f-string SQL) doesn't flag it.
sql = (
"SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent, "
" bs.started_at, bs.finished_at, bs.error_text "
"FROM burnin_stages bs "
"WHERE bs.burnin_job_id IN (" + placeholders + ") "
" AND bs.stage_name IN ('short_smart', 'long_smart') "
" AND bs.state IN ('running', 'passed', 'failed')"
)
cur = await db.execute(sql, bi_ids_with_smart)
for r in await cur.fetchall():
bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r)
drives = []
for row in rows:
d = _row_to_drive(row).model_dump()
d["status"] = _compute_status(d)
bi = burnin_by_drive.get(d["id"])
d["burnin"] = bi
# Overlay burn-in SMART stage progress/results onto the SMART columns
if bi and bi["id"] in bi_smart_stages:
for stage_name, stage in bi_smart_stages[bi["id"]].items():
target = "smart_short" if stage_name == "short_smart" else "smart_long"
# Only overlay if the standalone SMART column is idle/empty
existing = d.get(target) or {}
if existing.get("state") not in (None, "idle"):
continue
pct = stage["percent"] or 0
d[target] = {
"state": stage["state"],
"percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0),
"eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None,
"eta_timestamp": None,
"started_at": stage["started_at"],
"finished_at": stage["finished_at"],
"error_text": stage["error_text"],
}
drives.append(d)
return drives