nas-burnin/app/routes/_helpers.py
Brandon Walter aa7822d6ce
Some checks are pending
Security scan / pip-audit (push) Waiting to run
Security scan / bandit (push) Waiting to run
Security scan / gitleaks (push) Waiting to run
Security scan / mypy (push) Waiting to run
feat: rate limiter + mypy + lifecycle tests + routes/ split (1.0.0-33/-34)
Closes the four remaining items from the post-Codex hardening list.

#1 Rate-limit unlock + change-password endpoints (1.0.0-33)
   * Generalised the existing login limiter into a reusable
     `_RateLimiter` class in app/auth.py. Atomic check-then-increment
     in synchronous code so a parallel asyncio burst can't slip past
     the threshold.
   * `unlock_limiter` (5 attempts in 10 min → 10 min lockout) gates
     POST /api/v1/drives/{id}/unlock per-drive AND per-source-IP.
   * `pwchange_limiter` (5 in 10 min → 15 min lockout) gates
     POST /api/v1/auth/change-password per-user AND per-IP.
   * Both clear on successful operation. The login limiter keeps its
     existing `register_login_attempt` / `clear_login_failures`
     facade names so external callers don't change.

#3 mypy in security-scan (1.0.0-33)
   * Added a 4th tool to the daily scan + forge workflow. Runs in a
     throwaway python:3.12-slim container against the deploy dir,
     exit code is informational only (NOT included in the
     `TOTAL_EXIT` failure sum). Findings land in
     ~/security-scans/scan-YYYY-MM-DD/mypy.txt for ratchet-down
     work over time.
   * Forge job uses `continue-on-error: true` so it doesn't fail the
     workflow until the type-debt baseline is annotated down.

#4 Lifecycle test coverage (1.0.0-33)
   * New tests/test_lifecycle.py with 15 cases:
     - TestCommonHelpers (7 tests): _start_stage, _finish_stage
       success/failure/error-preservation, _recalculate_progress
       weighted math, _is_cancelled, _append_stage_log.
     - TestStartCancelJob (4 tests): start_job inserts queued row +
       correct stage list, duplicate-active rejection, cancel marks
       state, cancel returns False on terminal-state jobs.
     - TestRateLimiter (4 tests): under-threshold ok, trips at
       threshold, clear removes both counter + lockout, separate
       keys don't interfere.
   * Total goes from 44 to 59 tests; closes the orchestration-path
     coverage gap Codex flagged.

#2 Partial routes.py split (1.0.0-34)
   * routes.py → routes/ package. Same staged-extraction pattern as
     the burnin.py split.
   * routes/auth.py — login/logout/setup/change-password (170 LoC).
   * routes/system.py — /health, /ws/terminal, /api/v1/updates/check
     (136 LoC).
   * routes/_helpers.py — shared utilities used by both extracted
     modules and the still-monolithic remainder: client_ip,
     operator_for, is_stale, stale_context, secret_status,
     SECRET_FIELDS (97 LoC).
   * routes/__init__.py shrank from 1568 LoC to 1261. Future slices
     can extract drives, burnin, history, settings the same way.
   * GOTCHA recorded in commit body: `from app import auth` at the
     top of __init__.py binds `auth` as an attribute on the package
     namespace, so `from . import auth as _auth_routes` finds the
     OUTER module and yields `app.auth` instead of the submodule.
     Fix is `import app.routes.auth as _auth_routes` (absolute).
     This bit me once at deploy time; container failed to start
     with `module 'app.auth' has no attribute 'router'`.

Verification: 59/59 tests pass (44 existing + 15 new); container
boots clean at 1.0.0-34; /health 200 with all checks green; security
scan still clean (mypy informational findings ignored from totals).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 09:29:53 -04:00

97 lines
3.5 KiB
Python

"""Shared helpers used across multiple route modules.
Anything more than one route file needs lives here. Single-use helpers
stay in their owning route module.
"""
from __future__ import annotations
from datetime import datetime, timezone
from fastapi import HTTPException, Request
from app.config import settings
def client_ip(request: Request) -> str:
"""Best-effort source IP. Trusts X-Forwarded-For when present (we
sit behind nginx-proxy-manager) but falls back to the direct peer."""
fwd = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip()
return fwd or (request.client.host if request.client else "unknown")
def operator_for(request: Request, _ignored_body_value: str | None = None) -> str:
"""Always return the logged-in user's name for audit attribution.
The request body's `operator` field (if any) is ignored — clients
can't spoof the operator identity any more."""
user = getattr(request.state, "current_user", None)
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
return user.full_name or user.username
def is_stale(last_polled_at: str) -> bool:
"""True if the most recent poll is older than the stale threshold."""
try:
last = datetime.fromisoformat(last_polled_at)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - last).total_seconds() > settings.stale_threshold_seconds
except Exception:
return True
def stale_context(ps: dict) -> dict:
"""Returns the {stale, stale_seconds} dict every HTML page passes
to the layout for the warning banner."""
last = ps.get("last_poll_at")
if not last:
return {"stale": False, "stale_seconds": 0}
try:
t = datetime.fromisoformat(last)
if t.tzinfo is None:
t = t.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - t).total_seconds()
return {
"stale": age > settings.stale_threshold_seconds,
"stale_seconds": int(age),
}
except Exception:
return {"stale": False, "stale_seconds": 0}
# Field names that hold secrets and must never be rendered to the UI
# verbatim or included in the redacted-settings dump.
SECRET_FIELDS = ("smtp_password", "ssh_password", "ssh_key", "truenas_api_key")
def secret_status() -> dict[str, str]:
"""Per-secret display string for the settings page so the operator
can see whether each secret is configured (and how) without ever
rendering the value. Distinguishes env-var, mounted-file, and
DB-stored sources for ssh_key — the others can only come from the
live settings object."""
import os as _os
from app.ssh_client import _MOUNTED_KEY_PATH
def _has(field: str) -> bool:
v = getattr(settings, field, "")
return bool(v)
if _os.environ.get("SSH_KEY"):
ssh_key_status = "set (environment variable)"
elif _has("ssh_key"):
ssh_key_status = "set (stored in settings DB — prefer a mounted secret in production)"
elif _os.path.exists(
_os.environ.get("SSH_KEY_FILE", _MOUNTED_KEY_PATH)
):
ssh_key_status = "set (mounted secret)"
else:
ssh_key_status = "unset"
return {
"smtp_password": "set" if _has("smtp_password") else "unset",
"ssh_password": "set" if _has("ssh_password") else "unset",
"ssh_key": ssh_key_status,
"truenas_api_key": "set" if _has("truenas_api_key") else "unset",
}