nas-burnin/tests/test_unlock_flow.py
Brandon Walter 9cbae44495
Some checks are pending
Security scan / pip-audit (push) Waiting to run
Security scan / bandit (push) Waiting to run
Security scan / gitleaks (push) Waiting to run
refactor: split burnin.py into a package — extract unlock + kill (1.0.0-30)
First slice of the planned tech-debt cleanup. burnin.py was 1667 lines
and growing; staged extraction gives smaller diffs to review and a
clear bisect target if anything regresses.

Mechanical move only — no behaviour change. The two extracted modules:

* app/burnin/unlock.py — _UnlockGrant, _unlock_grants, PoolMemberError,
  is_unlocked / unlock_expiry / grant_pool_unlock, plus the four
  *_TOKEN constants and UNLOCK_TTL_SECONDS. Owns its module-level
  state; opens its own DB connection in grant_pool_unlock so it
  doesn't depend on the parent package's _db() helper.

* app/burnin/kill.py — _remote_pids dict and the kill_remote_process /
  set_remote_pid / clear_remote_pid / get_remote_pid helpers. Pulled
  out of __init__.py so the asyncssh-ignores-signals workaround lives
  next to the state it operates on.

app/burnin/__init__.py re-exports every public symbol the rest of the
app imports — `from app import burnin; burnin.start_job(...)`,
`burnin.PoolMemberError`, `burnin.UNLOCK_TTL_SECONDS`, etc. all keep
working unchanged. Internal aliases `_remote_pids` and `_unlock_grants`
on the package root point at the SAME dict objects in the submodules,
so existing in-package mutations (set in stages, cleared in cleanup
callbacks) work without rewrite.

Test fix: tests/test_unlock_flow.py:test_expired_grant_returns_false
monkey-patches UNLOCK_TTL_SECONDS. The package-root alias is bound at
import time and won't propagate back to the submodule's read site, so
the test now patches `app.burnin.unlock.UNLOCK_TTL_SECONDS` directly.

Verification: 44/44 unit tests pass in container; /health 200;
container boots clean. routes.py, mailer.py, poller.py untouched —
the public API is identical.

Future: extract stages, task, _common in subsequent versions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 00:44:28 -04:00

306 lines
13 KiB
Python

"""Unit tests for the pool-drive unlock state machine in burnin.py.
Covers: token validation per pool kind, identity-binding (grant
invalidated when pool_name/pool_role changes), TTL expiry, the
audit-commit-then-arm ordering (a failing audit insert leaves no
in-memory grant), and the unique-active-burnin partial index that
prevents duplicate queued rows for the same drive.
Uses an in-memory SQLite DB and monkeypatches app.config.settings.db_path.
No SSH, no network, no FastAPI.
Run with: python -m unittest discover tests/ -v
"""
import os
import tempfile
import time
import unittest
import aiosqlite
async def _setup_temp_db() -> str:
"""Create a temp SQLite file, point app.config at it, init schema.
Async-callable from IsolatedAsyncioTestCase.asyncSetUp."""
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
from app.config import settings
settings.db_path = path
from app.database import init_db
await init_db()
# Seed pool drives so unlock_flow tests have something to grant on.
async with aiosqlite.connect(path) as db:
await db.execute("""
INSERT INTO drives
(truenas_disk_id, devname, serial, model, size_bytes,
temperature_c, smart_health, last_seen_at, last_polled_at,
pool_name, pool_role, pool_seen_at)
VALUES ('test-id-1', 'sda', 'TESTSER1', 'TestModel', 1000,
30, 'PASSED', '2026-05-02T00:00:00+00:00',
'2026-05-02T00:00:00+00:00',
'tank', 'data', '2026-05-02T00:00:00+00:00')
""")
await db.execute("""
INSERT INTO drives
(truenas_disk_id, devname, serial, model, size_bytes,
temperature_c, smart_health, last_seen_at, last_polled_at,
pool_name, pool_role, pool_seen_at)
VALUES ('test-id-2', 'sdb', 'TESTSER2', 'TestModel', 1000,
30, 'PASSED', '2026-05-02T00:00:00+00:00',
'2026-05-02T00:00:00+00:00',
'boot-pool', 'data', '2026-05-02T00:00:00+00:00')
""")
await db.execute("""
INSERT INTO drives
(truenas_disk_id, devname, serial, model, size_bytes,
temperature_c, smart_health, last_seen_at, last_polled_at,
pool_name, pool_role, pool_seen_at)
VALUES ('test-id-3', 'sdc', 'TESTSER3', 'TestModel', 1000,
30, 'PASSED', '2026-05-02T00:00:00+00:00',
'2026-05-02T00:00:00+00:00',
'(exported)', 'exported', '2026-05-02T00:00:00+00:00')
""")
await db.commit()
return path
class TestUnlockFlow(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.db_path = await _setup_temp_db()
# Reset module state so previous test runs don't bleed in.
from app import burnin
burnin._unlock_grants.clear()
async def asyncTearDown(self):
try:
os.unlink(self.db_path)
except OSError:
pass
# ----- token validation per pool kind -----
async def test_active_pool_token_is_pool_name(self):
from app import burnin
# Drive 1 = tank/data
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(1, "wrong", "op", "valid reason")
expiry = await burnin.grant_pool_unlock(1, "tank", "op", "valid reason")
self.assertGreater(expiry, time.time())
async def test_boot_pool_token_is_destroy_phrase(self):
from app import burnin
# Drive 2 = boot-pool — typing the pool name must NOT work.
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(2, "boot-pool", "op", "valid reason")
expiry = await burnin.grant_pool_unlock(
2, "DESTROY BOOT POOL", "op", "valid reason"
)
self.assertGreater(expiry, time.time())
async def test_exported_token_is_destroy_phrase(self):
from app import burnin
# Drive 3 = (exported)/exported
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(3, "(exported)", "op", "valid reason")
expiry = await burnin.grant_pool_unlock(
3, "DESTROY EXPORTED POOL", "op", "valid reason"
)
self.assertGreater(expiry, time.time())
# ----- input validation -----
async def test_empty_reason_rejected(self):
from app import burnin
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(1, "tank", "op", "")
async def test_short_reason_rejected(self):
from app import burnin
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(1, "tank", "op", "hi")
async def test_empty_operator_rejected(self):
from app import burnin
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(1, "tank", "", "valid reason")
async def test_unknown_drive_rejected(self):
from app import burnin
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(99999, "anything", "op", "valid reason")
async def test_drive_not_in_pool_rejected(self):
from app import burnin
# Manually clear pool fields on drive 1
async with aiosqlite.connect(self.db_path) as db:
await db.execute("UPDATE drives SET pool_name=NULL, pool_role=NULL WHERE id=1")
await db.commit()
with self.assertRaises(ValueError):
await burnin.grant_pool_unlock(1, "tank", "op", "valid reason")
# ----- identity binding (Codex finding #2) -----
async def test_grant_invalidated_when_pool_name_changes(self):
from app import burnin
await burnin.grant_pool_unlock(1, "tank", "op", "valid reason")
# Operator's grant references tank/data; pool detection now reports tank2.
self.assertTrue(burnin._is_unlocked(1, "tank", "data"))
self.assertFalse(burnin._is_unlocked(1, "tank2", "data"))
# And the side effect: the grant is reaped, not just temporarily denied.
self.assertNotIn(1, burnin._unlock_grants)
async def test_grant_invalidated_when_pool_role_changes(self):
from app import burnin
await burnin.grant_pool_unlock(1, "tank", "op", "valid reason")
# Same pool, different role (data -> cache).
self.assertFalse(burnin._is_unlocked(1, "tank", "cache"))
self.assertNotIn(1, burnin._unlock_grants)
async def test_unlock_expiry_returns_none_for_mismatched_identity(self):
from app import burnin
await burnin.grant_pool_unlock(1, "tank", "op", "valid reason")
self.assertIsNotNone(burnin.unlock_expiry(1, "tank", "data"))
self.assertIsNone(burnin.unlock_expiry(1, "tank2", "data"))
# ----- TTL expiry -----
async def test_expired_grant_returns_false(self):
from app import burnin
from app.burnin import unlock as _unlock
# Drop TTL to 0 so the grant is born expired. Monkey-patch the
# real source-of-truth in app.burnin.unlock — the alias on the
# package root is bound at import time and won't propagate back.
original = _unlock.UNLOCK_TTL_SECONDS
_unlock.UNLOCK_TTL_SECONDS = 0
try:
await burnin.grant_pool_unlock(1, "tank", "op", "valid reason")
self.assertFalse(burnin._is_unlocked(1, "tank", "data"))
self.assertNotIn(1, burnin._unlock_grants)
finally:
_unlock.UNLOCK_TTL_SECONDS = original
# ----- audit commit ordering (Codex finding #3) -----
async def test_audit_event_recorded_for_active_pool(self):
from app import burnin
await burnin.grant_pool_unlock(1, "tank", "alice", "swapping out drive")
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT event_type, operator, message FROM audit_events "
"WHERE drive_id=? ORDER BY id DESC LIMIT 1", (1,)
)
row = await cur.fetchone()
self.assertEqual(row["event_type"], "pool_drive_unlocked")
self.assertEqual(row["operator"], "alice")
self.assertIn("swapping out drive", row["message"])
async def test_audit_event_for_boot_pool_uses_distinct_type(self):
from app import burnin
await burnin.grant_pool_unlock(
2, "DESTROY BOOT POOL", "alice", "replacing failed mirror"
)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT event_type FROM audit_events WHERE drive_id=? ORDER BY id DESC LIMIT 1",
(2,),
)
row = await cur.fetchone()
self.assertEqual(row["event_type"], "boot_pool_drive_unlocked")
async def test_audit_event_for_exported_uses_distinct_type(self):
from app import burnin
await burnin.grant_pool_unlock(
3, "DESTROY EXPORTED POOL", "alice", "decommissioned pool"
)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT event_type FROM audit_events WHERE drive_id=? ORDER BY id DESC LIMIT 1",
(3,),
)
row = await cur.fetchone()
self.assertEqual(row["event_type"], "exported_pool_drive_unlocked")
async def test_failed_token_does_not_record_audit_event(self):
from app import burnin
try:
await burnin.grant_pool_unlock(1, "wrong-token", "op", "valid reason")
except ValueError:
pass
async with aiosqlite.connect(self.db_path) as db:
cur = await db.execute(
"SELECT COUNT(*) FROM audit_events WHERE drive_id=?", (1,)
)
self.assertEqual((await cur.fetchone())[0], 0)
# And no in-memory grant was armed.
self.assertNotIn(1, burnin._unlock_grants)
class TestActiveJobUniqueIndex(unittest.IsolatedAsyncioTestCase):
"""Codex finding #4 — the partial unique index on burnin_jobs(drive_id)
WHERE state IN ('queued','running') must reject a second active row even
when two requests pass the SELECT-COUNT check concurrently."""
async def asyncSetUp(self):
self.db_path = await _setup_temp_db()
from app import burnin
burnin._unlock_grants.clear()
# Need to clear the pool field on drive 1 so unlock isn't required
# for these race tests.
async with aiosqlite.connect(self.db_path) as db:
await db.execute("UPDATE drives SET pool_name=NULL, pool_role=NULL WHERE id=1")
await db.commit()
# Burnin orchestrator init for the semaphore
from app import burnin as b
import asyncio as _a
b._semaphore = _a.Semaphore(4)
async def asyncTearDown(self):
try:
os.unlink(self.db_path)
except OSError:
pass
async def test_index_blocks_second_active_insert(self):
# Insert a queued row by hand, then try a second one — index fires.
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
VALUES (?,?,?,?,?,?)""",
(1, "surface", "queued", 0, "op", "2026-05-02T00:00:00+00:00"),
)
await db.commit()
with self.assertRaises(aiosqlite.IntegrityError):
await db.execute(
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
VALUES (?,?,?,?,?,?)""",
(1, "surface", "queued", 0, "op", "2026-05-02T00:00:01+00:00"),
)
await db.commit()
async def test_index_allows_terminal_state_then_new_job(self):
# passed/failed/cancelled/unknown rows must not block a fresh queue.
async with aiosqlite.connect(self.db_path) as db:
for state in ("passed", "failed", "cancelled", "unknown"):
await db.execute(
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
VALUES (?,?,?,?,?,?)""",
(1, "surface", state, 100, "op", "2026-05-02T00:00:00+00:00"),
)
await db.commit()
# Should succeed — no other queued/running row exists.
await db.execute(
"""INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at)
VALUES (?,?,?,?,?,?)""",
(1, "surface", "queued", 0, "op", "2026-05-02T00:00:00+00:00"),
)
await db.commit()
if __name__ == "__main__":
unittest.main()