"""Login / logout / first-user setup / password change routes. Public path mounting: GET /login — render login or first-user setup form POST /login — credential check + session bootstrap POST /api/v1/auth/setup — first-user creation (only when zero users) GET /logout — clear session, redirect POST /logout — same, for explicit POST clients POST /api/v1/auth/change-password — rotate password + audit """ from __future__ import annotations import time as _time from fastapi import APIRouter, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from app import auth from app.renderer import templates from ._helpers import client_ip router = APIRouter() @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request, next: str = "/", error: str | None = None): needs_setup = (await auth.user_count()) == 0 return templates.TemplateResponse(request, "login.html", { "request": request, "needs_setup": needs_setup, "error": error, "next": next if next.startswith("/") else "/", }) @router.post("/login") async def login_submit(request: Request): form = await request.form() username = (form.get("username") or "").strip() password = form.get("password") or "" next_url = form.get("next") or "/" if not next_url.startswith("/"): next_url = "/" ip = client_ip(request) # Atomic register-and-check: increments the counter NOW (before any # await), so a parallel burst of guesses can't all slip past the # threshold. Cleared on successful auth via clear_login_failures. attempt = auth.register_login_attempt(username, ip) if attempt != "ok": if attempt == "now_locked_out": await auth.audit_auth_event( "user_login_locked_out", username, f"Failed login from {ip} — IP/user locked out for {auth.LOGIN_LOCKOUT_SECONDS // 60} min", ) locked_until = auth.login_locked_until(username, ip) remaining = int((locked_until or _time.time()) - _time.time()) return templates.TemplateResponse(request, "login.html", { "request": request, "needs_setup": False, "error": f"Too many failed attempts. Try again in {remaining // 60 + 1} min.", "next": next_url, }, status_code=429) found = await auth.get_user_by_username(username) if not found or not auth.verify_password(password, found[1]): # Constant-ish-time: still call verify on a junk hash if user missing # so the timing of "user not found" matches "wrong password." if not found: auth.verify_password(password, "$2b$12$" + "x" * 53) await auth.audit_auth_event( "user_login_failed", username, f"Failed login from {ip}", ) return templates.TemplateResponse(request, "login.html", { "request": request, "needs_setup": False, "error": "Invalid username or password.", "next": next_url, }, status_code=401) user = found[0] auth.clear_login_failures(username, ip) # Clear any pre-login session keys before populating the new identity. # Closes session-fixation: if an attacker had somehow seeded the # browser with a session cookie, this discards everything in it # before issuing the new authenticated payload. request.session.clear() request.session["user_id"] = user.id request.session["username"] = user.username await auth.touch_last_login(user.id) await auth.audit_auth_event( "user_login", user.username, f"Signed in from {ip}", ) return RedirectResponse(url=next_url, status_code=303) @router.post("/api/v1/auth/setup") async def auth_first_user_setup(request: Request): """Create the first admin from the login page when the users table is empty. Public endpoint — but only does anything when zero users exist.""" if (await auth.user_count()) > 0: raise HTTPException(status_code=409, detail="Users already exist.") form = await request.form() username = (form.get("username") or "").strip() password = form.get("password") or "" full_name = (form.get("full_name") or "").strip() or None try: # bootstrap_only=True wraps the existence check + insert in an # IMMEDIATE transaction so two concurrent setup requests can't # both create admin accounts during the bootstrap window. user = await auth.create_user( username, password, full_name, is_admin=True, bootstrap_only=True ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) # Same fixation defense as the login flow — discard any pre-existing # session payload before issuing the authenticated identity. request.session.clear() request.session["user_id"] = user.id request.session["username"] = user.username await auth.touch_last_login(user.id) return RedirectResponse(url="/", status_code=303) @router.get("/logout") @router.post("/logout") async def logout(request: Request): user = request.state.current_user if hasattr(request.state, "current_user") else None if user: await auth.audit_auth_event( "user_logout", user.username, f"Signed out from {client_ip(request)}", ) request.session.clear() return RedirectResponse(url="/login", status_code=303) @router.post("/api/v1/auth/change-password") async def change_password(request: Request): user = request.state.current_user if hasattr(request.state, "current_user") else None if not user: raise HTTPException(status_code=401, detail="Authentication required") ip = client_ip(request) # Rate-limit before bcrypt to keep an attacker-controlled session # from burning CPU brute-forcing the current_password field. keys = (("user", user.username.lower()), ("ip", ip)) attempt = auth.pwchange_limiter.register(*keys) if attempt != "ok": raise HTTPException( status_code=429, detail="Too many password-change attempts. Try again later.", ) form = await request.form() current = form.get("current_password") or "" new_pw = form.get("new_password") or "" confirm = form.get("confirm_password") or "" if new_pw != confirm: raise HTTPException(status_code=400, detail="New passwords do not match.") try: await auth.change_password(user.id, current, new_pw) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) auth.pwchange_limiter.clear(*keys) await auth.audit_auth_event( "user_password_changed", user.username, f"Password changed from {ip}", ) return {"ok": True}