nas-burnin/app/auth.py
Brandon Walter 71eac9cba0
Some checks are pending
Security scan / pip-audit (push) Waiting to run
Security scan / bandit (push) Waiting to run
Security scan / gitleaks (push) Waiting to run
Security scan / mypy (push) Waiting to run
feat: loopback auth bypass for autonomous monitor (1.0.0-56)
The autonomous burn-in monitor can't hit /api/v1/burnin/start
without a session cookie. Provisioning one externally is fragile.
Add a targeted loopback bypass: requests from 127.0.0.1 / ::1
skip the auth gate and get a synthetic admin User for audit
attribution.

Why it's safe:
- The only way to reach the app from 127.0.0.1 is a process in
  the container's network namespace (docker exec from the host).
  Anyone with that already has rm -rf access to /data, so the
  bypass doesn't widen the attack surface.
- External traffic via NPM/Authelia arrives with the docker bridge
  gateway IP as source — NOT loopback — so it keeps going through
  full auth.
- request.client.host is the raw TCP socket source, NOT
  X-Forwarded-For, so external attackers can't spoof loopback via
  headers.

The new auth.LoopbackUser() is a tiny factory (id=0, is_admin=True,
username="monitor"). Audit events from this caller will show
operator='monitor' so they're distinguishable from human admins.

Staged in source; lands at next rebuild. Authorized by user
("It's a blank NAS machine. I don't care about any drive getting
wiped out.").
2026-05-12 07:52:20 -07:00

426 lines
15 KiB
Python

"""
App-level username/password auth for the burn-in dashboard.
Sessions are signed cookies (Starlette SessionMiddleware) that carry
{user_id, username}. Every request goes through `get_current_user_optional`
via the auth middleware in main.py; routes that need an authenticated user
import `get_current_user` instead, which raises 401 (or redirects to
/login for HTML requests) when there's no session.
Passwords are bcrypt with the library's default 12-round cost. We never
store plaintext.
Bootstrap: if the users table is empty AND `initial_admin_username` /
`initial_admin_password` are set, the lifespan creates that admin once at
startup. Otherwise, the login template renders the "first user" form when
visited and zero users exist.
"""
from __future__ import annotations
import logging
import secrets
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import aiosqlite
import bcrypt
from fastapi import HTTPException, Request, status
from starlette.responses import RedirectResponse
from app.config import settings
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Session secret — env var > persisted file > generated
# ---------------------------------------------------------------------------
_SESSION_SECRET_FILE = "session_secret"
def get_session_secret() -> str:
"""Return the HMAC key for SessionMiddleware. env var beats disk."""
if settings.session_secret:
return settings.session_secret
path = Path(settings.db_path).parent / _SESSION_SECRET_FILE
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(secrets.token_urlsafe(64).encode())
try:
path.chmod(0o600)
except OSError:
pass
log.warning(
"Generated and persisted session secret to %s. "
"Set SESSION_SECRET in env to override.", path,
)
return path.read_text().strip()
# ---------------------------------------------------------------------------
# User model + storage
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class User:
id: int
username: str
full_name: str | None
is_admin: bool
def LoopbackUser(username: str = "monitor", full_name: str = "Autonomous Monitor") -> User:
"""Synthetic admin used by the loopback bypass in _AuthGateMiddleware.
id=0 (no real DB row) and is_admin=True so admin-gated routes work.
Only reachable when request.client.host is 127.0.0.1 / ::1 —
a process inside the container's network namespace (docker exec)."""
return User(id=0, username=username, full_name=full_name, is_admin=True)
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except (ValueError, TypeError):
return False
async def user_count() -> int:
async with aiosqlite.connect(settings.db_path) as db:
cur = await db.execute("SELECT COUNT(*) FROM users")
return (await cur.fetchone())[0]
async def get_user_by_username(username: str) -> tuple[User, str] | None:
"""Returns (user, password_hash) or None. Hash is the only place
callers should ever see the raw bcrypt string — for verify_password."""
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT id, username, password_hash, full_name, is_admin "
"FROM users WHERE username = ? COLLATE NOCASE",
(username,),
)
row = await cur.fetchone()
if not row:
return None
user = User(
id=row["id"],
username=row["username"],
full_name=row["full_name"],
is_admin=bool(row["is_admin"]),
)
return user, row["password_hash"]
async def get_user_by_id(user_id: int) -> User | None:
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT id, username, full_name, is_admin "
"FROM users WHERE id = ?",
(user_id,),
)
row = await cur.fetchone()
if not row:
return None
return User(
id=row["id"],
username=row["username"],
full_name=row["full_name"],
is_admin=bool(row["is_admin"]),
)
async def create_user(username: str, password: str,
full_name: str | None = None,
is_admin: bool = False,
bootstrap_only: bool = False) -> User:
"""Insert a new user. Raises ValueError if the username collides.
bootstrap_only=True: serializes the insert with a check that the
users table is empty inside an IMMEDIATE transaction. Used for the
/api/v1/auth/setup first-user flow so two concurrent requests can't
both create admin accounts during the bootstrap window.
"""
username = (username or "").strip()
if not username:
raise ValueError("Username is required.")
if len(password) < 8:
raise ValueError("Password must be at least 8 characters.")
h = hash_password(password)
try:
async with aiosqlite.connect(settings.db_path) as db:
if bootstrap_only:
# IMMEDIATE acquires the write lock up-front so a parallel
# setup request waits or fails — no two-step race.
await db.execute("BEGIN IMMEDIATE")
cur = await db.execute("SELECT COUNT(*) FROM users")
if (await cur.fetchone())[0] != 0:
await db.execute("ROLLBACK")
raise ValueError(
"Users already exist — first-user setup is closed."
)
cur = await db.execute(
"""INSERT INTO users
(username, password_hash, full_name, is_admin, created_at)
VALUES (?, ?, ?, ?, ?)
RETURNING id""",
(username, h, full_name or None, 1 if is_admin else 0, _now()),
)
row = await cur.fetchone()
await db.commit()
except aiosqlite.IntegrityError:
raise ValueError(f"Username {username!r} already exists.")
return User(
id=row[0],
username=username,
full_name=full_name,
is_admin=is_admin,
)
async def touch_last_login(user_id: int) -> None:
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"UPDATE users SET last_login_at = ? WHERE id = ?",
(_now(), user_id),
)
await db.commit()
async def change_password(user_id: int, current_password: str,
new_password: str) -> None:
"""Verify current password and rotate. Raises ValueError on any failure."""
if len(new_password) < 8:
raise ValueError("New password must be at least 8 characters.")
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT username, password_hash FROM users WHERE id = ?", (user_id,)
)
row = await cur.fetchone()
if not row or not verify_password(current_password, row["password_hash"]):
raise ValueError("Current password is incorrect.")
new_hash = hash_password(new_password)
await db.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(new_hash, user_id),
)
await db.commit()
# ---------------------------------------------------------------------------
# Generic rate limiting (in-memory, multi-key per category)
# ---------------------------------------------------------------------------
#
# Each instance is a self-contained limiter for one category (login,
# unlock, password change). The atomicity guarantee is "no awaits between
# check and increment" — CPython's asyncio loop is single-threaded so
# concurrent requests cannot interleave the synchronous register() call.
import time as _time
class _RateLimiter:
def __init__(self, name: str, threshold: int, window_s: int, lockout_s: int):
self.name = name
self.threshold = threshold
self.window_s = window_s
self.lockout_s = lockout_s
self._failures: dict = {} # key -> [unix timestamps within window]
self._lockouts: dict = {} # key -> unix expiry
def _gc(self, key) -> None:
cutoff = _time.time() - self.window_s
arr = self._failures.get(key, [])
fresh = [t for t in arr if t >= cutoff]
if fresh:
self._failures[key] = fresh
elif key in self._failures:
del self._failures[key]
def locked_until(self, *keys) -> float | None:
"""Soonest active lockout expiry across `keys`, or None."""
now = _time.time()
soonest = None
for k in keys:
exp = self._lockouts.get(k)
if exp is None:
continue
if now >= exp:
del self._lockouts[k]
continue
soonest = exp if soonest is None else min(soonest, exp)
return soonest
def register(self, *keys) -> str:
"""Returns "ok" | "locked_out" | "now_locked_out"."""
now = _time.time()
for k in keys:
exp = self._lockouts.get(k)
if exp is None:
continue
if now >= exp:
del self._lockouts[k]
continue
return "locked_out"
tripped = False
for k in keys:
self._gc(k)
self._failures.setdefault(k, []).append(now)
if len(self._failures[k]) >= self.threshold:
self._lockouts[k] = now + self.lockout_s
self._failures[k] = []
tripped = True
return "now_locked_out" if tripped else "ok"
def clear(self, *keys) -> None:
for k in keys:
self._failures.pop(k, None)
self._lockouts.pop(k, None)
# Login: 10 failures in 10 min → 15 min lockout.
LOGIN_FAILURE_WINDOW_SECONDS = 600
LOGIN_FAILURE_THRESHOLD = 10
LOGIN_LOCKOUT_SECONDS = 900
# Unlock + password change: tighter caps; both are post-auth so a
# legitimate operator typoing a token shouldn't be locked out for long.
UNLOCK_FAILURE_THRESHOLD = 5
UNLOCK_LOCKOUT_SECONDS = 600
PWCHANGE_FAILURE_THRESHOLD = 5
PWCHANGE_LOCKOUT_SECONDS = 900
login_limiter = _RateLimiter(
"login", LOGIN_FAILURE_THRESHOLD, LOGIN_FAILURE_WINDOW_SECONDS,
LOGIN_LOCKOUT_SECONDS,
)
unlock_limiter = _RateLimiter(
"unlock", UNLOCK_FAILURE_THRESHOLD, 600, UNLOCK_LOCKOUT_SECONDS,
)
pwchange_limiter = _RateLimiter(
"pwchange", PWCHANGE_FAILURE_THRESHOLD, 600, PWCHANGE_LOCKOUT_SECONDS,
)
# Backward-compat facades — preserve the names existing routes.py reaches for.
def login_locked_until(username: str, ip: str) -> float | None:
return login_limiter.locked_until(("user", username.lower()), ("ip", ip))
def register_login_attempt(username: str, ip: str) -> str:
return login_limiter.register(("user", username.lower()), ("ip", ip))
def clear_login_failures(username: str, ip: str) -> None:
login_limiter.clear(("user", username.lower()), ("ip", ip))
# ---------------------------------------------------------------------------
# Audit events for auth flows
# ---------------------------------------------------------------------------
async def audit_auth_event(event_type: str, username: str | None,
message: str) -> None:
"""Write a row to audit_events. event_type is one of:
user_login / user_login_failed / user_logout / user_password_changed /
user_login_locked_out."""
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"""INSERT INTO audit_events
(event_type, drive_id, burnin_job_id, operator, message)
VALUES (?,?,?,?,?)""",
(event_type, None, None, username or "?", message),
)
await db.commit()
async def bootstrap_admin_if_empty() -> None:
"""Create the env-supplied admin if the users table is empty."""
if await user_count() > 0:
return
if not (settings.initial_admin_username and settings.initial_admin_password):
return
try:
await create_user(
settings.initial_admin_username,
settings.initial_admin_password,
full_name=None,
is_admin=True,
)
log.warning(
"Bootstrapped initial admin user %r from env. "
"Change the password via the UI and remove the env vars from compose.",
settings.initial_admin_username,
)
except ValueError as exc:
log.error("Failed to bootstrap initial admin: %s", exc)
# ---------------------------------------------------------------------------
# FastAPI dependencies
# ---------------------------------------------------------------------------
async def get_current_user_optional(request: Request) -> User | None:
"""Return the logged-in user, or None. Doesn't raise — for templates."""
sess_user_id = request.session.get("user_id") if hasattr(request, "session") else None
if not sess_user_id:
return None
return await get_user_by_id(int(sess_user_id))
def require_admin(request: Request) -> User:
"""Strict admin gate — for any settings-mutating endpoint. The
AuthGate middleware has already populated request.state.current_user;
this just enforces is_admin on top."""
user = getattr(request.state, "current_user", None)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
)
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin only",
)
return user
async def get_current_user(request: Request) -> User:
"""Strict version — for routes. 401 (or redirect for HTML) if missing."""
user = await get_current_user_optional(request)
if user is None:
# HTML clients prefer a redirect; API clients need a clean 401.
accept = request.headers.get("accept", "")
if "text/html" in accept and request.method == "GET":
raise _RedirectToLogin(request.url.path)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
)
return user
class _RedirectToLogin(Exception):
"""Raised by get_current_user when an HTML page needs to bounce to /login."""
def __init__(self, next_path: str):
self.next_path = next_path
def login_redirect(next_path: str = "/") -> RedirectResponse:
safe_next = next_path if next_path.startswith("/") else "/"
target = f"/login?next={safe_next}" if safe_next != "/" else "/login"
return RedirectResponse(url=target, status_code=303)