"""Shared helpers used across multiple route modules. Anything more than one route file needs lives here. Single-use helpers stay in their owning route module. """ from __future__ import annotations from datetime import datetime, timezone from fastapi import HTTPException, Request from app.config import settings def client_ip(request: Request) -> str: """Best-effort source IP. Trusts X-Forwarded-For when present (we sit behind nginx-proxy-manager) but falls back to the direct peer.""" fwd = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip() return fwd or (request.client.host if request.client else "unknown") def operator_for(request: Request, _ignored_body_value: str | None = None) -> str: """Always return the logged-in user's name for audit attribution. The request body's `operator` field (if any) is ignored — clients can't spoof the operator identity any more.""" user = getattr(request.state, "current_user", None) if not user: raise HTTPException(status_code=401, detail="Authentication required") return user.full_name or user.username def is_stale(last_polled_at: str) -> bool: """True if the most recent poll is older than the stale threshold.""" try: last = datetime.fromisoformat(last_polled_at) if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - last).total_seconds() > settings.stale_threshold_seconds except Exception: return True def stale_context(ps: dict) -> dict: """Returns the {stale, stale_seconds} dict every HTML page passes to the layout for the warning banner.""" last = ps.get("last_poll_at") if not last: return {"stale": False, "stale_seconds": 0} try: t = datetime.fromisoformat(last) if t.tzinfo is None: t = t.replace(tzinfo=timezone.utc) age = (datetime.now(timezone.utc) - t).total_seconds() return { "stale": age > settings.stale_threshold_seconds, "stale_seconds": int(age), } except Exception: return {"stale": False, "stale_seconds": 0} # Field names that hold secrets and must never be rendered to the UI # verbatim or included in the redacted-settings dump. SECRET_FIELDS = ("smtp_password", "ssh_password", "ssh_key", "truenas_api_key") def secret_status() -> dict[str, str]: """Per-secret display string for the settings page so the operator can see whether each secret is configured (and how) without ever rendering the value. Distinguishes env-var, mounted-file, and DB-stored sources for ssh_key — the others can only come from the live settings object.""" import os as _os from app.ssh_client import _MOUNTED_KEY_PATH def _has(field: str) -> bool: v = getattr(settings, field, "") return bool(v) if _os.environ.get("SSH_KEY"): ssh_key_status = "set (environment variable)" elif _has("ssh_key"): ssh_key_status = "set (stored in settings DB — prefer a mounted secret in production)" elif _os.path.exists( _os.environ.get("SSH_KEY_FILE", _MOUNTED_KEY_PATH) ): ssh_key_status = "set (mounted secret)" else: ssh_key_status = "unset" return { "smtp_password": "set" if _has("smtp_password") else "unset", "ssh_password": "set" if _has("ssh_password") else "unset", "ssh_key": ssh_key_status, "truenas_api_key": "set" if _has("truenas_api_key") else "unset", }