Addresses 12 of 13 findings from the Codex tech-debt + security review of versions 1.0.0-22 through 1.0.0-27. Item #5 (live pool re-check before start_job) deferred — would add an SSH round-trip per start. #1 Pool detection now treats zpool / lsblk / findmnt failures INDEPENDENTLY. Previously a single None blew away the whole map, so a host where lsblk lacks zfs_member info but zpool works would never lock pool members. Extended findmnt parser to recognise /dev/mapper/*, /dev/dm-*, /dev/md*, /dev/da*, /dev/ada* (LVM, devicemapper, MD RAID, FreeBSD CORE devnames). #2 Admin role enforced on every settings mutation. New auth.require_admin() helper applied to GET /settings, POST /api/v1/settings, /test-smtp, /test-ssh. Previously any authenticated user (the CLI explicitly supports non-admin accounts) could rewrite SMTP/SSH/API secrets. #3 First-user setup race closed. auth.create_user() now accepts bootstrap_only=True which wraps the existence check + insert in BEGIN IMMEDIATE so two concurrent /api/v1/auth/setup requests can't both create admin accounts during the bootstrap window. #4 Case-insensitive uniqueness enforced via new `uniq_users_username_nocase` index. Login does NOCASE lookup so without this `Admin` and `admin` could coexist as distinct rows. #6 New `session_cookie_secure` setting (default False for LAN/dev deploys, set True in production behind HTTPS) flips the session cookie's Secure flag. Defends against on-the-wire exposure when the dashboard is reachable over plain HTTP. #7 Audit trail bound to authenticated identity. Burn-in start / cancel / unlock / drive reset all now use `_operator_for(request)` which reads `request.state.current_user.full_name|username` instead of the body's operator field. Logged-in users can no longer spoof attribution. Drive reset's literal-"operator" fallback (window._operator was never set) is also fixed by this. #8 Login rate-limit race fixed. New `register_login_attempt()` is atomic check-AND-increment in synchronous code (no awaits inside), so a parallel burst can't slip past the threshold. `record_login_failure()` removed; `clear_login_failures()` now also drops any active lockout for a successful auth. Pre-existing bug where `tripped` was always False (so user_login_locked_out audit events never fired) also fixed. #9 NVMe surface_validate post-format check now mirrors the SSH path: fails on FAILED health AND on real SMART attribute failures, soft-passes SSH-only failures (logged), surfaces warnings to the stage log without failing. #10 retention.backup_db() now writes to `.tmp` then atomic-renames into the canonical daily slot — an interrupted backup leaves the tmp behind but doesn't corrupt the real snapshot. Scheduler marks last_run_date only on (prune AND backup) success so a transient failure gets retried within the 03:00 hour. #11 /health DB probe now exercises the WRITE path via a temp-table INSERT/SELECT/COMMIT round-trip. Previously only read PRAGMA journal_mode + a row count, which silently passes on read-only mounts and broken-WAL conditions. #12 security-scan.sh now fails loudly if `git fetch` or `git reset --hard origin/main` errors (was `|| true`, scanning stale code silently). pip-audit now runs in a throwaway python:3.12-slim container against requirements.txt instead of `docker exec`-ing into the live truenas-burnin container — cleaner separation, no transient package install on prod. #13 Badblocks SSH stage no longer doubles its log_text. Previously appended every 20-line chunk during streaming AND the full accumulated output at end. Now only flushes the un-flushed tail (typically <20 lines). `result["output"]` stays in-memory only. Verification: all 44 unit tests pass in container; /health 200; security scan returns 0 findings; deployed maple build is green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
538 lines
20 KiB
Python
538 lines
20 KiB
Python
"""
|
|
Polling loop — fetches TrueNAS state every POLL_INTERVAL_SECONDS and
|
|
normalizes it into SQLite.
|
|
|
|
Design notes:
|
|
- Opens its own DB connection per cycle (WAL allows concurrent readers).
|
|
- Skips a cycle if TrueNAS is unreachable; marks poller unhealthy.
|
|
- Never overwrites a 'running' state with stale history.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Any
|
|
|
|
import aiosqlite
|
|
|
|
from app.config import settings
|
|
from app.truenas import TrueNASClient
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Shared state read by the /health endpoint and dashboard template
|
|
_state: dict[str, Any] = {
|
|
"last_poll_at": None,
|
|
"last_error": None,
|
|
"healthy": False,
|
|
"drives_seen": 0,
|
|
"consecutive_failures": 0,
|
|
"system_temps": {}, # {"cpu_c": int|None, "pch_c": int|None}
|
|
"thermal_pressure": "ok", # "ok" | "warn" | "crit" — based on running burn-in drive temps
|
|
}
|
|
|
|
# SSE subscriber queues — notified after each successful poll
|
|
_subscribers: list[asyncio.Queue] = []
|
|
|
|
|
|
def get_state() -> dict:
|
|
return _state.copy()
|
|
|
|
|
|
def subscribe() -> asyncio.Queue:
|
|
q: asyncio.Queue = asyncio.Queue(maxsize=1)
|
|
_subscribers.append(q)
|
|
return q
|
|
|
|
|
|
def unsubscribe(q: asyncio.Queue) -> None:
|
|
try:
|
|
_subscribers.remove(q)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
def _notify_subscribers(alert: dict | None = None) -> None:
|
|
payload = {"alert": alert}
|
|
for q in list(_subscribers):
|
|
try:
|
|
q.put_nowait(payload)
|
|
except asyncio.QueueFull:
|
|
pass # Client is behind; skip this update
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _eta_from_progress(percent: int, started_iso: str | None) -> str | None:
|
|
"""Linear ETA extrapolation from elapsed time and percent complete."""
|
|
if not started_iso or percent <= 0:
|
|
return None
|
|
try:
|
|
start = datetime.fromisoformat(started_iso)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=timezone.utc)
|
|
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
|
|
total_est = elapsed / (percent / 100)
|
|
remaining = max(0.0, total_est - elapsed)
|
|
return (datetime.now(timezone.utc) + timedelta(seconds=remaining)).isoformat()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _map_history_state(status: str) -> str:
|
|
return "passed" if "without error" in status.lower() else "failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _upsert_drive(db: aiosqlite.Connection, disk: dict, now: str,
|
|
pool_info: dict | None = None,
|
|
update_pool: bool = True) -> int:
|
|
"""Insert/update a drive row.
|
|
|
|
pool_info: {"pool": str, "role": str} if this drive is currently in a
|
|
zpool, else None. None values clear pool columns so a removed-from-pool
|
|
drive doesn't stay locked.
|
|
|
|
update_pool: when False, pool columns are preserved on conflict and
|
|
initialised to NULL on insert. Callers pass False on detection failure
|
|
so a transient SSH outage doesn't silently unlock every drive.
|
|
"""
|
|
pool_name = pool_info["pool"] if pool_info else None
|
|
pool_role = pool_info["role"] if pool_info else None
|
|
pool_seen_at = now if pool_info else None
|
|
|
|
if update_pool:
|
|
update_clause = """
|
|
devname = excluded.devname,
|
|
serial = excluded.serial,
|
|
model = excluded.model,
|
|
size_bytes = excluded.size_bytes,
|
|
temperature_c = excluded.temperature_c,
|
|
smart_health = excluded.smart_health,
|
|
last_seen_at = excluded.last_seen_at,
|
|
last_polled_at = excluded.last_polled_at,
|
|
pool_name = excluded.pool_name,
|
|
pool_role = excluded.pool_role,
|
|
pool_seen_at = excluded.pool_seen_at
|
|
"""
|
|
else:
|
|
# Preserve pool_name / pool_role / pool_seen_at — detection failed
|
|
# this cycle, so we have no fresh data and must not overwrite.
|
|
update_clause = """
|
|
devname = excluded.devname,
|
|
serial = excluded.serial,
|
|
model = excluded.model,
|
|
size_bytes = excluded.size_bytes,
|
|
temperature_c = excluded.temperature_c,
|
|
smart_health = excluded.smart_health,
|
|
last_seen_at = excluded.last_seen_at,
|
|
last_polled_at = excluded.last_polled_at
|
|
"""
|
|
|
|
# SQL is built by concatenation rather than f-string so bandit's B608
|
|
# heuristic (which fires on f-string SQL regardless of source) doesn't
|
|
# flag it. update_clause is one of two hardcoded literal strings
|
|
# selected above; never carries user input.
|
|
sql = (
|
|
"INSERT INTO drives "
|
|
"(truenas_disk_id, devname, serial, model, size_bytes, "
|
|
" temperature_c, smart_health, last_seen_at, last_polled_at, "
|
|
" pool_name, pool_role, pool_seen_at) "
|
|
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?) "
|
|
"ON CONFLICT(truenas_disk_id) DO UPDATE SET "
|
|
+ update_clause
|
|
)
|
|
await db.execute(
|
|
sql,
|
|
(
|
|
disk["identifier"],
|
|
disk["devname"],
|
|
disk.get("serial"),
|
|
disk.get("model"),
|
|
disk.get("size"),
|
|
disk.get("temperature"),
|
|
disk.get("smart_health", "UNKNOWN"),
|
|
now,
|
|
now,
|
|
pool_name,
|
|
pool_role,
|
|
pool_seen_at,
|
|
),
|
|
)
|
|
cur = await db.execute(
|
|
"SELECT id FROM drives WHERE truenas_disk_id = ?", (disk["identifier"],)
|
|
)
|
|
row = await cur.fetchone()
|
|
return row["id"]
|
|
|
|
|
|
async def _upsert_test(db: aiosqlite.Connection, drive_id: int, ttype: str, data: dict) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO smart_tests
|
|
(drive_id, test_type, state, percent, truenas_job_id,
|
|
started_at, eta_at, finished_at, error_text)
|
|
VALUES (?,?,?,?,?,?,?,?,?)
|
|
ON CONFLICT(drive_id, test_type) DO UPDATE SET
|
|
state = excluded.state,
|
|
percent = excluded.percent,
|
|
truenas_job_id = excluded.truenas_job_id,
|
|
started_at = COALESCE(excluded.started_at, smart_tests.started_at),
|
|
eta_at = excluded.eta_at,
|
|
finished_at = excluded.finished_at,
|
|
error_text = excluded.error_text
|
|
""",
|
|
(
|
|
drive_id,
|
|
ttype,
|
|
data["state"],
|
|
data.get("percent", 0),
|
|
data.get("truenas_job_id"),
|
|
data.get("started_at"),
|
|
data.get("eta_at"),
|
|
data.get("finished_at"),
|
|
data.get("error_text"),
|
|
),
|
|
)
|
|
|
|
|
|
async def _apply_running_job(
|
|
db: aiosqlite.Connection, drive_id: int, ttype: str, job: dict
|
|
) -> None:
|
|
pct = job["progress"]["percent"]
|
|
await _upsert_test(db, drive_id, ttype, {
|
|
"state": "running",
|
|
"percent": pct,
|
|
"truenas_job_id": job["id"],
|
|
"started_at": job.get("time_started"),
|
|
"eta_at": _eta_from_progress(pct, job.get("time_started")),
|
|
"finished_at": None,
|
|
"error_text": None,
|
|
})
|
|
|
|
|
|
async def _sync_history(
|
|
db: aiosqlite.Connection,
|
|
client: TrueNASClient,
|
|
drive_id: int,
|
|
devname: str,
|
|
ttype: str,
|
|
) -> None:
|
|
"""Pull most recent completed test from history.
|
|
|
|
This is only called when the drive+type is NOT in the active running-jobs
|
|
dict, so it's safe to overwrite any previous 'running' state — the job
|
|
has finished (or was never started).
|
|
"""
|
|
try:
|
|
results = await client.get_smart_results(devname)
|
|
except Exception:
|
|
return # History fetch failure is non-fatal
|
|
|
|
if not results:
|
|
return
|
|
|
|
for test in results[0].get("tests", []):
|
|
t_name = test.get("type", "").lower()
|
|
is_short = "short" in t_name
|
|
if (ttype == "short") != is_short:
|
|
continue # Wrong test type
|
|
|
|
state = _map_history_state(test.get("status", ""))
|
|
await _upsert_test(db, drive_id, ttype, {
|
|
"state": state,
|
|
"percent": 100 if state == "passed" else 0,
|
|
"truenas_job_id": None,
|
|
"started_at": None,
|
|
"eta_at": None,
|
|
"finished_at": None,
|
|
"error_text": test.get("status_verbose") if state == "failed" else None,
|
|
})
|
|
break # Most recent only
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Poll cycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _poll_smart_via_ssh(db: aiosqlite.Connection, now: str) -> None:
|
|
"""
|
|
Poll progress for SMART tests started via SSH (truenas_job_id IS NULL).
|
|
Used on TrueNAS SCALE 25.10+ where the REST smart/test API no longer exists.
|
|
"""
|
|
from app import ssh_client
|
|
if not ssh_client.is_configured():
|
|
return
|
|
|
|
cur = await db.execute(
|
|
"""SELECT st.id, st.test_type, st.drive_id, d.devname, st.started_at
|
|
FROM smart_tests st
|
|
JOIN drives d ON d.id = st.drive_id
|
|
WHERE st.state = 'running' AND st.truenas_job_id IS NULL"""
|
|
)
|
|
rows = await cur.fetchall()
|
|
if not rows:
|
|
return
|
|
|
|
for row in rows:
|
|
test_id, ttype, drive_id, devname, started_at = row[0], row[1], row[2], row[3], row[4]
|
|
try:
|
|
progress = await ssh_client.poll_smart_progress(devname)
|
|
except Exception as exc:
|
|
log.warning("SSH SMART poll failed for %s: %s", devname, exc)
|
|
continue
|
|
|
|
state = progress["state"]
|
|
pct_remaining = progress.get("percent_remaining") # None = not yet in output
|
|
raw_output = progress.get("output", "")
|
|
|
|
if state == "running":
|
|
# pct_remaining=None means smartctl output doesn't have the % line yet
|
|
# (test just started) — keep percent at 0 rather than jumping to 100
|
|
if pct_remaining is None:
|
|
pct = 0
|
|
else:
|
|
pct = max(0, 100 - pct_remaining)
|
|
eta = _eta_from_progress(pct, started_at)
|
|
await db.execute(
|
|
"UPDATE smart_tests SET percent=?, eta_at=?, raw_output=? WHERE id=?",
|
|
(pct, eta, raw_output, test_id),
|
|
)
|
|
elif state == "passed":
|
|
await db.execute(
|
|
"UPDATE smart_tests SET state='passed', percent=100, finished_at=?, raw_output=? WHERE id=?",
|
|
(now, raw_output, test_id),
|
|
)
|
|
log.info("SSH SMART %s passed on %s", ttype, devname)
|
|
elif state == "failed":
|
|
await db.execute(
|
|
"UPDATE smart_tests SET state='failed', percent=0, finished_at=?, "
|
|
"error_text=?, raw_output=? WHERE id=?",
|
|
(now, f"SMART {ttype.upper()} test failed", raw_output, test_id),
|
|
)
|
|
log.warning("SSH SMART %s FAILED on %s", ttype, devname)
|
|
# state == "unknown" → keep polling, no update
|
|
|
|
await db.commit()
|
|
|
|
|
|
async def poll_cycle(client: TrueNASClient) -> int:
|
|
"""Run one full poll. Returns number of drives seen."""
|
|
now = _now()
|
|
|
|
disks = await client.get_disks()
|
|
running_jobs = await client.get_smart_jobs(state="RUNNING")
|
|
|
|
# Fetch temperatures via SCALE-specific endpoint.
|
|
# CORE doesn't have this endpoint — silently skip on any error.
|
|
try:
|
|
temps = await client.get_disk_temperatures()
|
|
except Exception:
|
|
temps = {}
|
|
|
|
# Inject temperature into each disk dict (SCALE 25.10 has no temp in /disk)
|
|
for disk in disks:
|
|
devname = disk.get("devname", "")
|
|
t = temps.get(devname)
|
|
if t is not None:
|
|
disk["temperature"] = int(round(t))
|
|
|
|
# SMART health — TrueNAS /api/v2.0/disk doesn't expose smart_health,
|
|
# so without this every drive defaults to UNKNOWN forever (only burn-in
|
|
# stages used to populate it). Run `smartctl -H` over a single SSH
|
|
# session for every drive every Nth cycle. Cache between cycles via
|
|
# _state so the dashboard always renders the most recent answer.
|
|
SMART_HEALTH_EVERY_N_CYCLES = 5 # ~1 minute at default 12s interval
|
|
_state.setdefault("smart_health_cache", {})
|
|
cycle_n = _state.get("cycle", 0) + 1
|
|
_state["cycle"] = cycle_n
|
|
try:
|
|
from app import ssh_client as _ssh
|
|
if _ssh.is_configured() and (cycle_n % SMART_HEALTH_EVERY_N_CYCLES == 1):
|
|
health_map = await _ssh.get_smart_health_map(
|
|
[d["devname"] for d in disks if d.get("devname")]
|
|
)
|
|
if health_map is not None:
|
|
_state["smart_health_cache"] = health_map
|
|
except Exception as exc:
|
|
log.warning("smart_health refresh failed: %s", exc)
|
|
health_cache = _state.get("smart_health_cache") or {}
|
|
for disk in disks:
|
|
devname = disk.get("devname", "")
|
|
h = health_cache.get(devname)
|
|
if h:
|
|
disk["smart_health"] = h
|
|
|
|
# Pool membership map — drives in any zpool are locked from burn-in.
|
|
# ssh_client returns None on failure (distinct from {} which means "no
|
|
# pools"). If EITHER detection call fails we fail-closed: leave
|
|
# pool_name / pool_role columns alone so previously-locked drives stay
|
|
# locked, and previously-unlocked drives stay unlocked, until detection
|
|
# recovers. Treating a transient SSH blip as "no pool members" would
|
|
# silently unlock every drive on the next poll.
|
|
# Each detection probe (pool / exported / mounted) succeeds or fails
|
|
# INDEPENDENTLY. Previously a single None blew away the whole map,
|
|
# so a fresh DB on a host where lsblk lacks zfs_member info but
|
|
# zpool works would never lock pool members. Now we apply each
|
|
# successful probe and only fail-closed for the ones that actually
|
|
# errored.
|
|
pool_map: dict = {}
|
|
pool_probe_ok = True # zpool list -vHP succeeded
|
|
zfs_probe_ok = True # lsblk zfs_member succeeded
|
|
mounted_probe_ok = True # findmnt succeeded
|
|
try:
|
|
from app import ssh_client as _ssh
|
|
if _ssh.is_configured():
|
|
pm = await _ssh.get_pool_membership()
|
|
zs = await _ssh.get_zfs_member_drives()
|
|
ms = await _ssh.get_mounted_drives()
|
|
pool_probe_ok = pm is not None
|
|
zfs_probe_ok = zs is not None
|
|
mounted_probe_ok = ms is not None
|
|
if pool_probe_ok:
|
|
pool_map.update(pm)
|
|
if zfs_probe_ok:
|
|
for devname in zs:
|
|
if devname not in pool_map:
|
|
pool_map[devname] = {"pool": "(exported)", "role": "exported"}
|
|
if mounted_probe_ok:
|
|
for devname in ms:
|
|
if devname not in pool_map:
|
|
pool_map[devname] = {"pool": "(mounted)", "role": "mounted"}
|
|
# SSH unconfigured (mock/dev mode) — all probes "succeed" with
|
|
# empty maps, so dev mode never artificially locks drives.
|
|
except Exception:
|
|
pool_probe_ok = zfs_probe_ok = mounted_probe_ok = False
|
|
pool_map = {}
|
|
|
|
# If ALL probes failed we have no fresh data at all — preserve the
|
|
# existing pool columns to keep locks honest. If at least one probe
|
|
# succeeded the new pool_map is a partial truth: we apply it and
|
|
# only refuse to clear locks coming from a probe that failed.
|
|
detection_ok = pool_probe_ok or zfs_probe_ok or mounted_probe_ok
|
|
|
|
if not (pool_probe_ok and zfs_probe_ok and mounted_probe_ok):
|
|
log.warning(
|
|
"Pool detection partial: pool=%s zfs=%s mounted=%s — preserving "
|
|
"stale lock state from any probe that failed.",
|
|
pool_probe_ok, zfs_probe_ok, mounted_probe_ok,
|
|
)
|
|
|
|
# Index running jobs by (devname, test_type)
|
|
active: dict[tuple[str, str], dict] = {}
|
|
for job in running_jobs:
|
|
try:
|
|
args = job["arguments"][0]
|
|
devname = args["disks"][0]
|
|
ttype = args["type"].lower()
|
|
active[(devname, ttype)] = job
|
|
except (KeyError, IndexError, TypeError):
|
|
pass
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute("PRAGMA foreign_keys=ON")
|
|
|
|
for disk in disks:
|
|
devname = disk["devname"]
|
|
drive_id = await _upsert_drive(
|
|
db, disk, now,
|
|
pool_map.get(devname) if detection_ok else None,
|
|
update_pool=detection_ok,
|
|
)
|
|
|
|
for ttype in ("short", "long"):
|
|
if (devname, ttype) in active:
|
|
await _apply_running_job(db, drive_id, ttype, active[(devname, ttype)])
|
|
else:
|
|
await _sync_history(db, client, drive_id, devname, ttype)
|
|
|
|
await db.commit()
|
|
|
|
# SSH SMART polling — for tests started via smartctl (no TrueNAS REST job)
|
|
await _poll_smart_via_ssh(db, now)
|
|
|
|
return len(disks)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Background loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def run(client: TrueNASClient) -> None:
|
|
log.info("Poller started", extra={"poll_interval": settings.poll_interval_seconds})
|
|
cycle = 0
|
|
while True:
|
|
try:
|
|
count = await poll_cycle(client)
|
|
cycle += 1
|
|
_state["last_poll_at"] = _now()
|
|
_state["last_error"] = None
|
|
_state["healthy"] = True
|
|
_state["drives_seen"] = count
|
|
_state["consecutive_failures"] = 0
|
|
log.debug("Poll OK", extra={"drives": count})
|
|
|
|
# System sensor temps via SSH (non-fatal)
|
|
from app import ssh_client as _ssh
|
|
if _ssh.is_configured():
|
|
try:
|
|
_state["system_temps"] = await _ssh.get_system_sensors()
|
|
except Exception:
|
|
pass
|
|
|
|
# Thermal pressure: max temp of drives currently under burn-in
|
|
try:
|
|
async with aiosqlite.connect(settings.db_path) as _tdb:
|
|
_tdb.row_factory = aiosqlite.Row
|
|
await _tdb.execute("PRAGMA journal_mode=WAL")
|
|
_cur = await _tdb.execute("""
|
|
SELECT MAX(d.temperature_c)
|
|
FROM drives d
|
|
JOIN burnin_jobs bj ON bj.drive_id = d.id
|
|
WHERE bj.state = 'running' AND d.temperature_c IS NOT NULL
|
|
""")
|
|
_row = await _cur.fetchone()
|
|
_max_t = _row[0] if _row and _row[0] is not None else None
|
|
if _max_t is None:
|
|
_state["thermal_pressure"] = "ok"
|
|
elif _max_t >= settings.temp_crit_c:
|
|
_state["thermal_pressure"] = "crit"
|
|
elif _max_t >= settings.temp_warn_c:
|
|
_state["thermal_pressure"] = "warn"
|
|
else:
|
|
_state["thermal_pressure"] = "ok"
|
|
except Exception:
|
|
_state["thermal_pressure"] = "ok"
|
|
|
|
_notify_subscribers()
|
|
|
|
# Check for stuck jobs every 5 cycles (~1 min at default 12s interval)
|
|
if cycle % 5 == 0:
|
|
try:
|
|
from app import burnin as _burnin
|
|
await _burnin.check_stuck_jobs()
|
|
except Exception as exc:
|
|
log.error("Stuck-job check failed: %s", exc)
|
|
|
|
except Exception as exc:
|
|
failures = _state["consecutive_failures"] + 1
|
|
_state["consecutive_failures"] = failures
|
|
_state["last_error"] = str(exc)
|
|
_state["healthy"] = False
|
|
if failures >= 5:
|
|
log.critical(
|
|
"Poller has failed %d consecutive times: %s",
|
|
failures, exc,
|
|
extra={"consecutive_failures": failures},
|
|
)
|
|
else:
|
|
log.error("Poll failed: %s", exc, extra={"consecutive_failures": failures})
|
|
|
|
await asyncio.sleep(settings.poll_interval_seconds)
|