truenas-burnin/mock-truenas/app.py
Brandon Walter b73b5251ae Initial commit — TrueNAS Burn-In Dashboard v0.5.0
Full-stack burn-in orchestration dashboard (Stages 1–6d complete):
FastAPI backend, SQLite/WAL, SSE live dashboard, mock TrueNAS server,
SMTP/webhook notifications, batch burn-in, settings UI, audit log,
stats page, cancel SMART/burn-in, drag-to-reorder stages.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 00:08:29 -05:00

345 lines
13 KiB
Python

"""
Mock TrueNAS CORE v2.0 API Server
Simulates the TrueNAS CORE REST API for development and testing.
All state is in-memory. Restart resets everything.
Simulation behavior:
- SHORT test completes in ~90 seconds real-time
- LONG test completes in ~8 minutes real-time
- Drive 'sdn' (serial FAIL001) always fails SMART at ~30%
- Temperatures drift slightly on each tick
- Debug endpoints at /debug/* for test control
"""
import asyncio
import random
import time
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Mock TrueNAS CORE API", version="13.0-U6.1")
# ---------------------------------------------------------------------------
# Simulation constants
# ---------------------------------------------------------------------------
SHORT_DURATION_SECONDS = 90
LONG_DURATION_SECONDS = 480
TICK_SECONDS = 5
# ---------------------------------------------------------------------------
# Static drive inventory — 15 drives, sda-sdo, mixed capacities
# ---------------------------------------------------------------------------
_BASE_DRIVES = [
# 12TB Seagate Exos — sda, sdb, sdc
{"identifier": "3500151795937c001", "name": "sda", "devname": "sda", "serial": "WDZ1A001", "model": "ST12000NM0008", "size": 12000138625024, "rotationrate": 7200, "_base_temp": 36},
{"identifier": "3500151795937c002", "name": "sdb", "devname": "sdb", "serial": "WDZ1A002", "model": "ST12000NM0008", "size": 12000138625024, "rotationrate": 7200, "_base_temp": 34},
{"identifier": "3500151795937c003", "name": "sdc", "devname": "sdc", "serial": "WDZ1A003", "model": "ST12000NM0008", "size": 12000138625024, "rotationrate": 7200, "_base_temp": 37},
# 8TB WD Red — sdd, sde, sdf
{"identifier": "3500151795937c004", "name": "sdd", "devname": "sdd", "serial": "WDZ1A004", "model": "WD80EFAX", "size": 8001563222016, "rotationrate": 5400, "_base_temp": 32},
{"identifier": "3500151795937c005", "name": "sde", "devname": "sde", "serial": "WDZ1A005", "model": "WD80EFAX", "size": 8001563222016, "rotationrate": 5400, "_base_temp": 33},
{"identifier": "3500151795937c006", "name": "sdf", "devname": "sdf", "serial": "WDZ1A006", "model": "WD80EFAX", "size": 8001563222016, "rotationrate": 5400, "_base_temp": 31},
# 16TB Seagate Exos — sdg, sdh
{"identifier": "3500151795937c007", "name": "sdg", "devname": "sdg", "serial": "WDZ1A007", "model": "ST16000NM001G", "size": 16000900661248, "rotationrate": 7200, "_base_temp": 38},
{"identifier": "3500151795937c008", "name": "sdh", "devname": "sdh", "serial": "WDZ1A008", "model": "ST16000NM001G", "size": 16000900661248, "rotationrate": 7200, "_base_temp": 39},
# 4TB Seagate IronWolf — sdi, sdj
{"identifier": "3500151795937c009", "name": "sdi", "devname": "sdi", "serial": "WDZ1A009", "model": "ST4000VN008", "size": 4000787030016, "rotationrate": 5900, "_base_temp": 30},
{"identifier": "3500151795937c00a", "name": "sdj", "devname": "sdj", "serial": "WDZ1A010", "model": "ST4000VN008", "size": 4000787030016, "rotationrate": 5900, "_base_temp": 29},
# 10TB Toshiba — sdk, sdl
{"identifier": "3500151795937c00b", "name": "sdk", "devname": "sdk", "serial": "WDZ1A011", "model": "TOSHIBA MG06ACA10TE", "size": 10000831348736, "rotationrate": 7200, "_base_temp": 41},
{"identifier": "3500151795937c00c", "name": "sdl", "devname": "sdl", "serial": "WDZ1A012", "model": "TOSHIBA MG06ACA10TE", "size": 10000831348736, "rotationrate": 7200, "_base_temp": 40},
# 8TB HGST — sdm
{"identifier": "3500151795937c00d", "name": "sdm", "devname": "sdm", "serial": "WDZ1A013", "model": "HGST HUH728080ALE604", "size": 8001563222016, "rotationrate": 7200, "_base_temp": 35},
# Always-fail drive — sdn
{"identifier": "3500151795937c00e", "name": "sdn", "devname": "sdn", "serial": "FAIL001", "model": "TOSHIBA MG06ACA10TE", "size": 10000831348736, "rotationrate": 7200, "_base_temp": 45, "_always_fail": True},
# 6TB Seagate Archive — sdo
{"identifier": "3500151795937c00f", "name": "sdo", "devname": "sdo", "serial": "WDZ1A015", "model": "ST6000DM003", "size": 6001175126016, "rotationrate": 5900, "_base_temp": 33},
]
# Shared fields for every drive
_DRIVE_DEFAULTS = {
"type": "HDD",
"bus": "SCSI",
"togglesmart": True,
"pool": None,
"enclosure": None,
}
# ---------------------------------------------------------------------------
# Mutable in-memory state
# ---------------------------------------------------------------------------
_state: dict = {
"drives": {},
"jobs": {},
"smart_history": {},
"job_counter": 1000,
}
def _init_state() -> None:
for d in _BASE_DRIVES:
devname = d["devname"]
_state["drives"][devname] = {
**_DRIVE_DEFAULTS,
**{k: v for k, v in d.items() if not k.startswith("_")},
"zfs_guid": f"1234{int(d['identifier'], 16):016x}",
"temperature": d["_base_temp"],
"smart_health": "PASSED",
"_base_temp": d["_base_temp"],
"_always_fail": d.get("_always_fail", False),
}
_state["smart_history"][devname] = []
_init_state()
def _public_drive(d: dict) -> dict:
return {k: v for k, v in d.items() if not k.startswith("_")}
def _public_job(j: dict) -> dict:
return {k: v for k, v in j.items() if not k.startswith("_")}
# ---------------------------------------------------------------------------
# Simulation loop
# ---------------------------------------------------------------------------
async def _simulation_loop() -> None:
while True:
await asyncio.sleep(TICK_SECONDS)
_tick()
def _tick() -> None:
for drive in _state["drives"].values():
drift = random.randint(-1, 2)
drive["temperature"] = max(20, min(70, drive["_base_temp"] + drift))
now_iso = datetime.now(timezone.utc).isoformat()
for job_id, job in list(_state["jobs"].items()):
if job["state"] != "RUNNING":
continue
elapsed = time.monotonic() - job["_started_mono"]
duration = job["_duration_seconds"]
if job["_always_fail"] and elapsed / duration >= 0.30:
job["state"] = "FAILED"
job["error"] = "SMART test aborted: uncorrectable read error at LBA 0x1234567"
job["progress"]["percent"] = 30
job["progress"]["description"] = "Test failed"
job["time_finished"] = now_iso
_record_smart_result(job, failed=True)
continue
pct = min(100, int(elapsed / duration * 100))
job["progress"]["percent"] = pct
job["progress"]["description"] = (
f"Running SMART {job['_test_type'].lower()} test on {job['_disk']} ({pct}%)"
)
if pct >= 100:
job["state"] = "SUCCESS"
job["result"] = True
job["time_finished"] = now_iso
job["progress"]["percent"] = 100
job["progress"]["description"] = "Completed without error"
_record_smart_result(job, failed=False)
def _record_smart_result(job: dict, failed: bool) -> None:
devname = job["_disk"]
test_type = job["_test_type"]
history = _state["smart_history"].get(devname, [])
num = len(history) + 1
history.insert(0, {
"num": num,
"type": "Short offline" if test_type == "SHORT" else "Extended offline",
"status": "Read failure" if failed else "Completed without error",
"status_verbose": (
"Read failure - error in segment #1" if failed
else "Completed without error"
),
"remaining": 0,
"lifetime": random.randint(10000, 50000),
"lba_of_first_error": "0x1234567" if failed else None,
})
drive = _state["drives"].get(devname)
if drive:
drive["smart_health"] = "FAILED" if failed else "PASSED"
# ---------------------------------------------------------------------------
# Request models
# ---------------------------------------------------------------------------
class SmartTestRequest(BaseModel):
disks: list[str]
type: str # SHORT | LONG
class AbortRequest(BaseModel):
id: int
# ---------------------------------------------------------------------------
# API Routes — mirrors TrueNAS CORE v2.0
# ---------------------------------------------------------------------------
@app.get("/api/v2.0/disk")
async def list_disks():
return [_public_drive(d) for d in _state["drives"].values()]
@app.get("/api/v2.0/disk/{identifier}")
async def get_disk(identifier: str):
for d in _state["drives"].values():
if d["identifier"] == identifier or d["devname"] == identifier:
return _public_drive(d)
raise HTTPException(status_code=404, detail="Disk not found")
@app.get("/api/v2.0/smart/test/results/{disk_name}")
async def smart_test_results(disk_name: str):
if disk_name not in _state["smart_history"]:
raise HTTPException(status_code=404, detail="Disk not found")
return [{"disk": disk_name, "tests": _state["smart_history"][disk_name]}]
@app.post("/api/v2.0/smart/test")
async def start_smart_test(req: SmartTestRequest):
if req.type not in ("SHORT", "LONG"):
raise HTTPException(status_code=422, detail="type must be SHORT or LONG")
job_ids = []
for disk_name in req.disks:
if disk_name not in _state["drives"]:
raise HTTPException(status_code=404, detail=f"Disk {disk_name} not found")
_state["job_counter"] += 1
job_id = _state["job_counter"]
drive = _state["drives"][disk_name]
duration = SHORT_DURATION_SECONDS if req.type == "SHORT" else LONG_DURATION_SECONDS
_state["jobs"][job_id] = {
"id": job_id,
"method": "smart.test",
"arguments": [{"disks": [disk_name], "type": req.type}],
"state": "RUNNING",
"progress": {
"percent": 0,
"description": f"Running SMART {req.type.lower()} test on {disk_name}",
"extra": None,
},
"result": None,
"error": None,
"exception": None,
"time_started": datetime.now(timezone.utc).isoformat(),
"time_finished": None,
"_started_mono": time.monotonic(),
"_duration_seconds": duration,
"_disk": disk_name,
"_test_type": req.type,
"_always_fail": drive["_always_fail"],
}
job_ids.append(job_id)
return job_ids[0] if len(job_ids) == 1 else job_ids
@app.get("/api/v2.0/core/get_jobs")
async def get_jobs(method: Optional[str] = None, state: Optional[str] = None):
results = []
for job in _state["jobs"].values():
if method and job["method"] != method:
continue
if state and job["state"] != state:
continue
results.append(_public_job(job))
return results
@app.get("/api/v2.0/core/get_jobs/{job_id}")
async def get_job(job_id: int):
job = _state["jobs"].get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return _public_job(job)
@app.post("/api/v2.0/core/job_abort")
async def abort_job(req: AbortRequest):
job = _state["jobs"].get(req.id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job["state"] != "RUNNING":
raise HTTPException(status_code=400, detail=f"Job is not running (state={job['state']})")
job["state"] = "ABORTED"
job["time_finished"] = datetime.now(timezone.utc).isoformat()
job["progress"]["description"] = "Aborted by user"
return True
@app.get("/api/v2.0/system/info")
async def system_info():
return {
"version": "TrueNAS-13.0-U6.1",
"hostname": "mock-truenas",
"uptime_seconds": 86400,
"system_serial": "MOCK-SN-001",
"system_product": "MOCK SERVER",
"cores": 4,
"physmem": 17179869184,
}
@app.get("/health")
async def health():
return {"status": "ok", "mock": True, "drives": len(_state["drives"]), "jobs": len(_state["jobs"])}
# ---------------------------------------------------------------------------
# Debug endpoints
# ---------------------------------------------------------------------------
@app.post("/debug/reset")
async def debug_reset():
_state["drives"].clear()
_state["jobs"].clear()
_state["smart_history"].clear()
_state["job_counter"] = 1000
_init_state()
return {"reset": True}
@app.get("/debug/state")
async def debug_state():
return {
"drives": {k: _public_drive(v) for k, v in _state["drives"].items()},
"jobs": {str(k): _public_job(v) for k, v in _state["jobs"].items()},
"smart_history": _state["smart_history"],
"job_counter": _state["job_counter"],
}
@app.post("/debug/complete-all-jobs")
async def debug_complete_all():
completed = []
for job_id, job in _state["jobs"].items():
if job["state"] == "RUNNING":
job["_started_mono"] -= job["_duration_seconds"]
completed.append(job_id)
return {"fast_forwarded": completed}
# ---------------------------------------------------------------------------
# Startup
# ---------------------------------------------------------------------------
@app.on_event("startup")
async def startup():
asyncio.create_task(_simulation_loop())