nas-burnin/app/auth.py
Brandon Walter d4c0770b9e feat: app-level login + hardening sweep (1.0.0-22 -> 1.0.0-23)
Two layered changes shipped in this branch:

== 1.0.0-22: app-level authentication ==

The dashboard previously had only an IP allowlist. Adds username +
bcrypt password auth, signed-cookie sessions, and a "first user setup"
flow.

* New app/auth.py: User dataclass, bcrypt hash/verify, get_user_by_id/
  username, create_user, touch_last_login, FastAPI `get_current_user`
  dependency. Session secret loaded from SESSION_SECRET env or persisted
  to /data/session_secret.
* New app/auth_cli.py: `python -m app.auth_cli list|reset|add` for
  out-of-band user management. Passwords always read from a TTY prompt.
* Schema: idempotent ALTER for `users` table (id, username unique,
  password_hash, full_name, is_admin, created_at, last_login_at).
* main.py: SessionMiddleware (HMAC-signed cookie, max-age 7 days,
  SameSite=strict — see hardening section) + _AuthGateMiddleware that
  populates request.state.current_user and bounces unauth'd HTML GETs
  to /login while returning 401 JSON for everything else.
* Routes: GET /login renders first-user-setup form when users table is
  empty otherwise sign-in form; POST /login; POST /api/v1/auth/setup
  (only works while empty); GET|POST /logout.
* Bootstrap: env vars INITIAL_ADMIN_USERNAME + INITIAL_ADMIN_PASSWORD
  create the first admin on startup if both set AND users table empty.
  Ignored thereafter — change passwords via UI or CLI.
* Layout: header shows current_user.full_name|username + Logout link.
  Modal operator field auto-fills from the logged-in user via
  <meta name="default-operator"> rendered in layout (replaces the
  localStorage-only previous behaviour).
* requirements.txt: pinned bcrypt>=4.0,<5.0, itsdangerous>=2.1,
  python-multipart>=0.0.7. First step toward addressing the
  unpinned-deps gotcha.
* New app/templates/login.html with first-user-setup variant.

== 1.0.0-23: hardening sweep ==

Closes the eight-item gap audit:

* DB retention + automated backup. New app/retention.py runs daily at
  03:00 local. Nulls burnin_stages.log_text on stages older than
  retention_log_days (default 35), VACUUMs to reclaim pages, then runs
  `sqlite3 .backup` to /data/backups/app-YYYY-MM-DD.db keeping the
  retention_backup_keep most recent (default 14). Wired into the
  lifespan supervisor next to mailer/poller.

* CSRF mitigation. SessionMiddleware bumped to SameSite=strict so the
  browser refuses to send the session cookie on cross-site POSTs —
  removes the actual CSRF vector. Trade-off: external links into the
  app require re-auth.

* Login rate limiting. In-memory per-username AND per-source-IP failure
  counters in auth.py. 10 failures within 10 min trips a 15-min lockout
  for both keys. Returns HTTP 429 with a clear "try again in N min"
  message. Cleared on successful login.

* Login audit events. New event types in audit_events: user_login,
  user_login_failed, user_login_locked_out, user_logout,
  user_password_changed. All include source IP. Recorded via
  auth.audit_auth_event().

* Password change UI. Header link "Change password" opens
  templates/components/modal_password.html (current/new/confirm).
  Posts to POST /api/v1/auth/change-password — bcrypt-verifies current,
  requires >=8 char new pw, writes audit event.

* NVMe burn-in path. _stage_surface_validate now detects nvme*
  devnames and routes to _stage_surface_validate_nvme() which runs
  `nvme format -s 1 --force` (cryptographic erase). Seconds vs hours
  of badblocks, exercises the controller's secure-erase. Falls back
  to badblocks if nvme-cli isn't installed. Post-format SMART check.

* Mounted-FS detection. ssh_client.get_mounted_drives() runs
  `findmnt -no SOURCE`, parses non-ZFS sources back to base devnames.
  Poller treats them as pool_name='(mounted)', pool_role='mounted'.
  Confirm token DESTROY MOUNTED FILESYSTEM, distinct purple styling,
  audit event mounted_drive_unlocked, daily-report banner picks it up.

* Deeper /health. Real readiness check — DB write probe (PRAGMA
  journal_mode), poller freshness (age <= 3x stale_threshold), SSH
  test_connection() when configured. Returns 503 when any check fails
  so a proxy/orchestrator can take the container out of rotation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 11:08:29 -04:00

336 lines
12 KiB
Python

"""
App-level username/password auth for the burn-in dashboard.
Sessions are signed cookies (Starlette SessionMiddleware) that carry
{user_id, username}. Every request goes through `get_current_user_optional`
via the auth middleware in main.py; routes that need an authenticated user
import `get_current_user` instead, which raises 401 (or redirects to
/login for HTML requests) when there's no session.
Passwords are bcrypt with the library's default 12-round cost. We never
store plaintext.
Bootstrap: if the users table is empty AND `initial_admin_username` /
`initial_admin_password` are set, the lifespan creates that admin once at
startup. Otherwise, the login template renders the "first user" form when
visited and zero users exist.
"""
from __future__ import annotations
import logging
import secrets
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import aiosqlite
import bcrypt
from fastapi import HTTPException, Request, status
from starlette.responses import RedirectResponse
from app.config import settings
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Session secret — env var > persisted file > generated
# ---------------------------------------------------------------------------
_SESSION_SECRET_FILE = "session_secret"
def get_session_secret() -> str:
"""Return the HMAC key for SessionMiddleware. env var beats disk."""
if settings.session_secret:
return settings.session_secret
path = Path(settings.db_path).parent / _SESSION_SECRET_FILE
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(secrets.token_urlsafe(64).encode())
try:
path.chmod(0o600)
except OSError:
pass
log.warning(
"Generated and persisted session secret to %s. "
"Set SESSION_SECRET in env to override.", path,
)
return path.read_text().strip()
# ---------------------------------------------------------------------------
# User model + storage
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class User:
id: int
username: str
full_name: str | None
is_admin: bool
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except (ValueError, TypeError):
return False
async def user_count() -> int:
async with aiosqlite.connect(settings.db_path) as db:
cur = await db.execute("SELECT COUNT(*) FROM users")
return (await cur.fetchone())[0]
async def get_user_by_username(username: str) -> tuple[User, str] | None:
"""Returns (user, password_hash) or None. Hash is the only place
callers should ever see the raw bcrypt string — for verify_password."""
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT id, username, password_hash, full_name, is_admin "
"FROM users WHERE username = ? COLLATE NOCASE",
(username,),
)
row = await cur.fetchone()
if not row:
return None
user = User(
id=row["id"],
username=row["username"],
full_name=row["full_name"],
is_admin=bool(row["is_admin"]),
)
return user, row["password_hash"]
async def get_user_by_id(user_id: int) -> User | None:
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT id, username, full_name, is_admin "
"FROM users WHERE id = ?",
(user_id,),
)
row = await cur.fetchone()
if not row:
return None
return User(
id=row["id"],
username=row["username"],
full_name=row["full_name"],
is_admin=bool(row["is_admin"]),
)
async def create_user(username: str, password: str,
full_name: str | None = None,
is_admin: bool = False) -> User:
"""Insert a new user. Raises ValueError if the username collides."""
username = (username or "").strip()
if not username:
raise ValueError("Username is required.")
if len(password) < 8:
raise ValueError("Password must be at least 8 characters.")
h = hash_password(password)
try:
async with aiosqlite.connect(settings.db_path) as db:
cur = await db.execute(
"""INSERT INTO users
(username, password_hash, full_name, is_admin, created_at)
VALUES (?, ?, ?, ?, ?)
RETURNING id""",
(username, h, full_name or None, 1 if is_admin else 0, _now()),
)
row = await cur.fetchone()
await db.commit()
except aiosqlite.IntegrityError:
raise ValueError(f"Username {username!r} already exists.")
return User(
id=row[0],
username=username,
full_name=full_name,
is_admin=is_admin,
)
async def touch_last_login(user_id: int) -> None:
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"UPDATE users SET last_login_at = ? WHERE id = ?",
(_now(), user_id),
)
await db.commit()
async def change_password(user_id: int, current_password: str,
new_password: str) -> None:
"""Verify current password and rotate. Raises ValueError on any failure."""
if len(new_password) < 8:
raise ValueError("New password must be at least 8 characters.")
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT username, password_hash FROM users WHERE id = ?", (user_id,)
)
row = await cur.fetchone()
if not row or not verify_password(current_password, row["password_hash"]):
raise ValueError("Current password is incorrect.")
new_hash = hash_password(new_password)
await db.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(new_hash, user_id),
)
await db.commit()
# ---------------------------------------------------------------------------
# Login rate limiting (in-memory, per-username + per-source-IP)
# ---------------------------------------------------------------------------
import time as _time
LOGIN_FAILURE_WINDOW_SECONDS = 600 # 10 min
LOGIN_FAILURE_THRESHOLD = 10 # this many failures within the window
LOGIN_LOCKOUT_SECONDS = 900 # then block for 15 min
# {(key,): [(timestamp, ...), ...]} key = (kind, value), kind in {"user","ip"}
_login_failures: dict = {}
_login_lockouts: dict = {} # key -> unix expiry
def _gc_failures(key) -> None:
"""Drop failure timestamps older than the window."""
arr = _login_failures.get(key, [])
cutoff = _time.time() - LOGIN_FAILURE_WINDOW_SECONDS
fresh = [t for t in arr if t >= cutoff]
if fresh:
_login_failures[key] = fresh
elif key in _login_failures:
del _login_failures[key]
def login_locked_until(username: str, ip: str) -> float | None:
"""Returns the lockout expiry (unix ts) if either dimension is locked,
else None. Lazily reaps expired lockouts."""
now = _time.time()
soonest = None
for key in (("user", username.lower()), ("ip", ip)):
exp = _login_lockouts.get(key)
if exp is None:
continue
if now >= exp:
del _login_lockouts[key]
continue
soonest = exp if soonest is None else min(soonest, exp)
return soonest
def record_login_failure(username: str, ip: str) -> bool:
"""Returns True if this failure tripped a lockout."""
tripped = False
now = _time.time()
for key in (("user", username.lower()), ("ip", ip)):
_gc_failures(key)
_login_failures.setdefault(key, []).append(now)
if len(_login_failures[key]) >= LOGIN_FAILURE_THRESHOLD:
_login_lockouts[key] = now + LOGIN_LOCKOUT_SECONDS
_login_failures[key] = [] # reset counter once lockout armed
tripped = True
return tripped
def clear_login_failures(username: str, ip: str) -> None:
for key in (("user", username.lower()), ("ip", ip)):
_login_failures.pop(key, None)
# ---------------------------------------------------------------------------
# Audit events for auth flows
# ---------------------------------------------------------------------------
async def audit_auth_event(event_type: str, username: str | None,
message: str) -> None:
"""Write a row to audit_events. event_type is one of:
user_login / user_login_failed / user_logout / user_password_changed /
user_login_locked_out."""
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"""INSERT INTO audit_events
(event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
(event_type, None, None, username or "?", message),
)
await db.commit()
async def bootstrap_admin_if_empty() -> None:
"""Create the env-supplied admin if the users table is empty."""
if await user_count() > 0:
return
if not (settings.initial_admin_username and settings.initial_admin_password):
return
try:
await create_user(
settings.initial_admin_username,
settings.initial_admin_password,
full_name=None,
is_admin=True,
)
log.warning(
"Bootstrapped initial admin user %r from env. "
"Change the password via the UI and remove the env vars from compose.",
settings.initial_admin_username,
)
except ValueError as exc:
log.error("Failed to bootstrap initial admin: %s", exc)
# ---------------------------------------------------------------------------
# FastAPI dependencies
# ---------------------------------------------------------------------------
async def get_current_user_optional(request: Request) -> User | None:
"""Return the logged-in user, or None. Doesn't raise — for templates."""
sess_user_id = request.session.get("user_id") if hasattr(request, "session") else None
if not sess_user_id:
return None
return await get_user_by_id(int(sess_user_id))
async def get_current_user(request: Request) -> User:
"""Strict version — for routes. 401 (or redirect for HTML) if missing."""
user = await get_current_user_optional(request)
if user is None:
# HTML clients prefer a redirect; API clients need a clean 401.
accept = request.headers.get("accept", "")
if "text/html" in accept and request.method == "GET":
raise _RedirectToLogin(request.url.path)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
)
return user
class _RedirectToLogin(Exception):
"""Raised by get_current_user when an HTML page needs to bounce to /login."""
def __init__(self, next_path: str):
self.next_path = next_path
def login_redirect(next_path: str = "/") -> RedirectResponse:
safe_next = next_path if next_path.startswith("/") else "/"
target = f"/login?next={safe_next}" if safe_next != "/" else "/login"
return RedirectResponse(url=target, status_code=303)