Largest routes/ slice yet — drives.py (8 endpoints) and burnin.py (4 endpoints). Drives helpers live in _drives_helpers.py so the dashboard SSE handler in routes/__init__.py and mailer.py can both keep using them via re-export. routes/__init__.py shrinks from 815 → 163 LoC; only the dashboard / and /sse/drives stream remain there. Routes split is now functionally complete: 12 files, ~1800 LoC distributed by feature.
199 lines
7.5 KiB
Python
199 lines
7.5 KiB
Python
"""Shared drives helpers — used by routes/drives.py, routes/__init__.py
|
|
(for the dashboard + SSE), AND mailer.py (for the daily report).
|
|
|
|
This module exists so the drives endpoints can be extracted to their
|
|
own file without making mailer's `from app.routes import _fetch_drives_
|
|
for_template` break. The package re-exports `_fetch_drives_for_template`
|
|
on its `app.routes` namespace for that backward-compat shim.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
import aiosqlite
|
|
|
|
from app import burnin
|
|
from app.models import DriveResponse, SmartTestState
|
|
|
|
from ._helpers import is_stale
|
|
|
|
|
|
def _eta_seconds(eta_at: str | None) -> int | None:
|
|
if not eta_at:
|
|
return None
|
|
try:
|
|
eta_ts = datetime.fromisoformat(eta_at)
|
|
if eta_ts.tzinfo is None:
|
|
eta_ts = eta_ts.replace(tzinfo=timezone.utc)
|
|
remaining = (eta_ts - datetime.now(timezone.utc)).total_seconds()
|
|
return max(0, int(remaining))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _compute_eta_seconds(started_at: str | None, percent: int) -> int | None:
|
|
"""Linear ETA extrapolation from started_at and percent complete."""
|
|
if not started_at or percent <= 0:
|
|
return None
|
|
try:
|
|
start = datetime.fromisoformat(started_at)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=timezone.utc)
|
|
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
|
|
total_est = elapsed / (percent / 100)
|
|
remaining = max(0, int(total_est - elapsed))
|
|
return remaining
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _build_smart(row: aiosqlite.Row, prefix: str) -> SmartTestState:
|
|
eta_at = row[f"{prefix}_eta_at"]
|
|
return SmartTestState(
|
|
state=row[f"{prefix}_state"] or "idle",
|
|
percent=row[f"{prefix}_percent"],
|
|
eta_seconds=_eta_seconds(eta_at),
|
|
eta_timestamp=eta_at,
|
|
started_at=row[f"{prefix}_started_at"],
|
|
finished_at=row[f"{prefix}_finished_at"],
|
|
error_text=row[f"{prefix}_error"],
|
|
)
|
|
|
|
|
|
def _row_to_drive(row: aiosqlite.Row) -> DriveResponse:
|
|
return DriveResponse(
|
|
id=row["id"],
|
|
devname=row["devname"],
|
|
serial=row["serial"],
|
|
model=row["model"],
|
|
size_bytes=row["size_bytes"],
|
|
temperature_c=row["temperature_c"],
|
|
smart_health=row["smart_health"] or "UNKNOWN",
|
|
last_polled_at=row["last_polled_at"],
|
|
is_stale=is_stale(row["last_polled_at"]),
|
|
smart_short=_build_smart(row, "short"),
|
|
smart_long=_build_smart(row, "long"),
|
|
notes=row["notes"],
|
|
location=row["location"],
|
|
pool_name=row["pool_name"],
|
|
pool_role=row["pool_role"],
|
|
pool_unlocked_until=burnin.unlock_expiry(
|
|
row["id"], row["pool_name"], row["pool_role"],
|
|
),
|
|
)
|
|
|
|
|
|
def _compute_status(drive: dict) -> str:
|
|
short = (drive.get("smart_short") or {}).get("state", "idle")
|
|
long_ = (drive.get("smart_long") or {}).get("state", "idle")
|
|
health = drive.get("smart_health", "UNKNOWN")
|
|
if "running" in (short, long_):
|
|
return "running"
|
|
if short == "failed" or long_ == "failed" or health == "FAILED":
|
|
return "failed"
|
|
if "passed" in (short, long_):
|
|
return "passed"
|
|
return "idle"
|
|
|
|
|
|
_DRIVES_QUERY = """
|
|
SELECT
|
|
d.id, d.devname, d.serial, d.model, d.size_bytes,
|
|
d.temperature_c, d.smart_health, d.last_polled_at,
|
|
d.notes, d.location, d.pool_name, d.pool_role,
|
|
s.state AS short_state,
|
|
s.percent AS short_percent,
|
|
s.started_at AS short_started_at,
|
|
s.eta_at AS short_eta_at,
|
|
s.finished_at AS short_finished_at,
|
|
s.error_text AS short_error,
|
|
l.state AS long_state,
|
|
l.percent AS long_percent,
|
|
l.started_at AS long_started_at,
|
|
l.eta_at AS long_eta_at,
|
|
l.finished_at AS long_finished_at,
|
|
l.error_text AS long_error
|
|
FROM drives d
|
|
LEFT JOIN smart_tests s ON s.drive_id = d.id AND s.test_type = 'short'
|
|
LEFT JOIN smart_tests l ON l.drive_id = d.id AND l.test_type = 'long'
|
|
WHERE d.last_seen_at >= datetime('now', '-7 days')
|
|
{where}
|
|
ORDER BY d.devname
|
|
"""
|
|
|
|
|
|
async def _fetch_burnin_by_drive(db: aiosqlite.Connection) -> dict[int, dict]:
|
|
"""Return latest burn-in job (any state) keyed by drive_id.
|
|
|
|
Jobs created before the drive's last_reset_at are excluded so the
|
|
dashboard burn-in column clears after a reset while history is preserved.
|
|
"""
|
|
cur = await db.execute("""
|
|
SELECT bj.*
|
|
FROM burnin_jobs bj
|
|
JOIN drives d ON d.id = bj.drive_id
|
|
WHERE bj.id IN (SELECT MAX(id) FROM burnin_jobs GROUP BY drive_id)
|
|
AND (d.last_reset_at IS NULL OR bj.created_at > d.last_reset_at)
|
|
""")
|
|
rows = await cur.fetchall()
|
|
return {r["drive_id"]: dict(r) for r in rows}
|
|
|
|
|
|
async def _fetch_drives_for_template(db: aiosqlite.Connection) -> list[dict]:
|
|
cur = await db.execute(_DRIVES_QUERY.format(where=""))
|
|
rows = await cur.fetchall()
|
|
burnin_by_drive = await _fetch_burnin_by_drive(db)
|
|
|
|
# For burn-ins that include SMART stages, fetch those stages so we can
|
|
# mirror their progress/result in the Short/Long SMART columns.
|
|
bi_smart_stages: dict[int, dict[str, dict]] = {} # job_id -> {stage_name: row}
|
|
bi_ids_with_smart = [
|
|
bi["id"] for bi in burnin_by_drive.values()
|
|
if bi["state"] in ("running", "queued")
|
|
]
|
|
if bi_ids_with_smart:
|
|
placeholders = ",".join("?" * len(bi_ids_with_smart))
|
|
# placeholders is purely structural ("?,?,?"); IDs themselves are
|
|
# bound via the parameter tuple. SQL built via concatenation so
|
|
# bandit's B608 (which fires on any f-string SQL) doesn't flag it.
|
|
sql = (
|
|
"SELECT bs.burnin_job_id, bs.stage_name, bs.state, bs.percent, "
|
|
" bs.started_at, bs.finished_at, bs.error_text "
|
|
"FROM burnin_stages bs "
|
|
"WHERE bs.burnin_job_id IN (" + placeholders + ") "
|
|
" AND bs.stage_name IN ('short_smart', 'long_smart') "
|
|
" AND bs.state IN ('running', 'passed', 'failed')"
|
|
)
|
|
cur = await db.execute(sql, bi_ids_with_smart)
|
|
for r in await cur.fetchall():
|
|
bi_smart_stages.setdefault(r["burnin_job_id"], {})[r["stage_name"]] = dict(r)
|
|
|
|
drives = []
|
|
for row in rows:
|
|
d = _row_to_drive(row).model_dump()
|
|
d["status"] = _compute_status(d)
|
|
bi = burnin_by_drive.get(d["id"])
|
|
d["burnin"] = bi
|
|
|
|
# Overlay burn-in SMART stage progress/results onto the SMART columns
|
|
if bi and bi["id"] in bi_smart_stages:
|
|
for stage_name, stage in bi_smart_stages[bi["id"]].items():
|
|
target = "smart_short" if stage_name == "short_smart" else "smart_long"
|
|
# Only overlay if the standalone SMART column is idle/empty
|
|
existing = d.get(target) or {}
|
|
if existing.get("state") not in (None, "idle"):
|
|
continue
|
|
pct = stage["percent"] or 0
|
|
d[target] = {
|
|
"state": stage["state"],
|
|
"percent": pct if stage["state"] == "running" else (100 if stage["state"] == "passed" else 0),
|
|
"eta_seconds": _compute_eta_seconds(stage["started_at"], pct) if stage["state"] == "running" else None,
|
|
"eta_timestamp": None,
|
|
"started_at": stage["started_at"],
|
|
"finished_at": stage["finished_at"],
|
|
"error_text": stage["error_text"],
|
|
}
|
|
|
|
drives.append(d)
|
|
return drives
|