"""Unit tests for the pool-drive unlock state machine in burnin.py. Covers: token validation per pool kind, identity-binding (grant invalidated when pool_name/pool_role changes), TTL expiry, the audit-commit-then-arm ordering (a failing audit insert leaves no in-memory grant), and the unique-active-burnin partial index that prevents duplicate queued rows for the same drive. Uses an in-memory SQLite DB and monkeypatches app.config.settings.db_path. No SSH, no network, no FastAPI. Run with: python -m unittest discover tests/ -v """ import os import tempfile import time import unittest import aiosqlite async def _setup_temp_db() -> str: """Create a temp SQLite file, point app.config at it, init schema. Async-callable from IsolatedAsyncioTestCase.asyncSetUp.""" fd, path = tempfile.mkstemp(suffix=".db") os.close(fd) from app.config import settings settings.db_path = path from app.database import init_db await init_db() # Seed pool drives so unlock_flow tests have something to grant on. async with aiosqlite.connect(path) as db: await db.execute(""" INSERT INTO drives (truenas_disk_id, devname, serial, model, size_bytes, temperature_c, smart_health, last_seen_at, last_polled_at, pool_name, pool_role, pool_seen_at) VALUES ('test-id-1', 'sda', 'TESTSER1', 'TestModel', 1000, 30, 'PASSED', '2026-05-02T00:00:00+00:00', '2026-05-02T00:00:00+00:00', 'tank', 'data', '2026-05-02T00:00:00+00:00') """) await db.execute(""" INSERT INTO drives (truenas_disk_id, devname, serial, model, size_bytes, temperature_c, smart_health, last_seen_at, last_polled_at, pool_name, pool_role, pool_seen_at) VALUES ('test-id-2', 'sdb', 'TESTSER2', 'TestModel', 1000, 30, 'PASSED', '2026-05-02T00:00:00+00:00', '2026-05-02T00:00:00+00:00', 'boot-pool', 'data', '2026-05-02T00:00:00+00:00') """) await db.execute(""" INSERT INTO drives (truenas_disk_id, devname, serial, model, size_bytes, temperature_c, smart_health, last_seen_at, last_polled_at, pool_name, pool_role, pool_seen_at) VALUES ('test-id-3', 'sdc', 'TESTSER3', 'TestModel', 1000, 30, 'PASSED', '2026-05-02T00:00:00+00:00', '2026-05-02T00:00:00+00:00', '(exported)', 'exported', '2026-05-02T00:00:00+00:00') """) await db.commit() return path class TestUnlockFlow(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self.db_path = await _setup_temp_db() # Reset module state so previous test runs don't bleed in. from app import burnin burnin._unlock_grants.clear() async def asyncTearDown(self): try: os.unlink(self.db_path) except OSError: pass # ----- token validation per pool kind ----- async def test_active_pool_token_is_pool_name(self): from app import burnin # Drive 1 = tank/data with self.assertRaises(ValueError): await burnin.grant_pool_unlock(1, "wrong", "op", "valid reason") expiry = await burnin.grant_pool_unlock(1, "tank", "op", "valid reason") self.assertGreater(expiry, time.time()) async def test_boot_pool_token_is_destroy_phrase(self): from app import burnin # Drive 2 = boot-pool — typing the pool name must NOT work. with self.assertRaises(ValueError): await burnin.grant_pool_unlock(2, "boot-pool", "op", "valid reason") expiry = await burnin.grant_pool_unlock( 2, "DESTROY BOOT POOL", "op", "valid reason" ) self.assertGreater(expiry, time.time()) async def test_exported_token_is_destroy_phrase(self): from app import burnin # Drive 3 = (exported)/exported with self.assertRaises(ValueError): await burnin.grant_pool_unlock(3, "(exported)", "op", "valid reason") expiry = await burnin.grant_pool_unlock( 3, "DESTROY EXPORTED POOL", "op", "valid reason" ) self.assertGreater(expiry, time.time()) # ----- input validation ----- async def test_empty_reason_rejected(self): from app import burnin with self.assertRaises(ValueError): await burnin.grant_pool_unlock(1, "tank", "op", "") async def test_short_reason_rejected(self): from app import burnin with self.assertRaises(ValueError): await burnin.grant_pool_unlock(1, "tank", "op", "hi") async def test_empty_operator_rejected(self): from app import burnin with self.assertRaises(ValueError): await burnin.grant_pool_unlock(1, "tank", "", "valid reason") async def test_unknown_drive_rejected(self): from app import burnin with self.assertRaises(ValueError): await burnin.grant_pool_unlock(99999, "anything", "op", "valid reason") async def test_drive_not_in_pool_rejected(self): from app import burnin # Manually clear pool fields on drive 1 async with aiosqlite.connect(self.db_path) as db: await db.execute("UPDATE drives SET pool_name=NULL, pool_role=NULL WHERE id=1") await db.commit() with self.assertRaises(ValueError): await burnin.grant_pool_unlock(1, "tank", "op", "valid reason") # ----- identity binding (Codex finding #2) ----- async def test_grant_invalidated_when_pool_name_changes(self): from app import burnin await burnin.grant_pool_unlock(1, "tank", "op", "valid reason") # Operator's grant references tank/data; pool detection now reports tank2. self.assertTrue(burnin._is_unlocked(1, "tank", "data")) self.assertFalse(burnin._is_unlocked(1, "tank2", "data")) # And the side effect: the grant is reaped, not just temporarily denied. self.assertNotIn(1, burnin._unlock_grants) async def test_grant_invalidated_when_pool_role_changes(self): from app import burnin await burnin.grant_pool_unlock(1, "tank", "op", "valid reason") # Same pool, different role (data -> cache). self.assertFalse(burnin._is_unlocked(1, "tank", "cache")) self.assertNotIn(1, burnin._unlock_grants) async def test_unlock_expiry_returns_none_for_mismatched_identity(self): from app import burnin await burnin.grant_pool_unlock(1, "tank", "op", "valid reason") self.assertIsNotNone(burnin.unlock_expiry(1, "tank", "data")) self.assertIsNone(burnin.unlock_expiry(1, "tank2", "data")) # ----- TTL expiry ----- async def test_expired_grant_returns_false(self): from app import burnin from app.burnin import unlock as _unlock # Drop TTL to 0 so the grant is born expired. Monkey-patch the # real source-of-truth in app.burnin.unlock — the alias on the # package root is bound at import time and won't propagate back. original = _unlock.UNLOCK_TTL_SECONDS _unlock.UNLOCK_TTL_SECONDS = 0 try: await burnin.grant_pool_unlock(1, "tank", "op", "valid reason") self.assertFalse(burnin._is_unlocked(1, "tank", "data")) self.assertNotIn(1, burnin._unlock_grants) finally: _unlock.UNLOCK_TTL_SECONDS = original # ----- audit commit ordering (Codex finding #3) ----- async def test_audit_event_recorded_for_active_pool(self): from app import burnin await burnin.grant_pool_unlock(1, "tank", "alice", "swapping out drive") async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cur = await db.execute( "SELECT event_type, operator, message FROM audit_events " "WHERE drive_id=? ORDER BY id DESC LIMIT 1", (1,) ) row = await cur.fetchone() self.assertEqual(row["event_type"], "pool_drive_unlocked") self.assertEqual(row["operator"], "alice") self.assertIn("swapping out drive", row["message"]) async def test_audit_event_for_boot_pool_uses_distinct_type(self): from app import burnin await burnin.grant_pool_unlock( 2, "DESTROY BOOT POOL", "alice", "replacing failed mirror" ) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cur = await db.execute( "SELECT event_type FROM audit_events WHERE drive_id=? ORDER BY id DESC LIMIT 1", (2,), ) row = await cur.fetchone() self.assertEqual(row["event_type"], "boot_pool_drive_unlocked") async def test_audit_event_for_exported_uses_distinct_type(self): from app import burnin await burnin.grant_pool_unlock( 3, "DESTROY EXPORTED POOL", "alice", "decommissioned pool" ) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cur = await db.execute( "SELECT event_type FROM audit_events WHERE drive_id=? ORDER BY id DESC LIMIT 1", (3,), ) row = await cur.fetchone() self.assertEqual(row["event_type"], "exported_pool_drive_unlocked") async def test_failed_token_does_not_record_audit_event(self): from app import burnin try: await burnin.grant_pool_unlock(1, "wrong-token", "op", "valid reason") except ValueError: pass async with aiosqlite.connect(self.db_path) as db: cur = await db.execute( "SELECT COUNT(*) FROM audit_events WHERE drive_id=?", (1,) ) self.assertEqual((await cur.fetchone())[0], 0) # And no in-memory grant was armed. self.assertNotIn(1, burnin._unlock_grants) class TestActiveJobUniqueIndex(unittest.IsolatedAsyncioTestCase): """Codex finding #4 — the partial unique index on burnin_jobs(drive_id) WHERE state IN ('queued','running') must reject a second active row even when two requests pass the SELECT-COUNT check concurrently.""" async def asyncSetUp(self): self.db_path = await _setup_temp_db() from app import burnin burnin._unlock_grants.clear() # Need to clear the pool field on drive 1 so unlock isn't required # for these race tests. async with aiosqlite.connect(self.db_path) as db: await db.execute("UPDATE drives SET pool_name=NULL, pool_role=NULL WHERE id=1") await db.commit() # Burnin orchestrator init for the semaphore from app import burnin as b import asyncio as _a b._semaphore = _a.Semaphore(4) async def asyncTearDown(self): try: os.unlink(self.db_path) except OSError: pass async def test_index_blocks_second_active_insert(self): # Insert a queued row by hand, then try a second one — index fires. async with aiosqlite.connect(self.db_path) as db: await db.execute( """INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at) VALUES (?,?,?,?,?,?)""", (1, "surface", "queued", 0, "op", "2026-05-02T00:00:00+00:00"), ) await db.commit() with self.assertRaises(aiosqlite.IntegrityError): await db.execute( """INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at) VALUES (?,?,?,?,?,?)""", (1, "surface", "queued", 0, "op", "2026-05-02T00:00:01+00:00"), ) await db.commit() async def test_index_allows_terminal_state_then_new_job(self): # passed/failed/cancelled/unknown rows must not block a fresh queue. async with aiosqlite.connect(self.db_path) as db: for state in ("passed", "failed", "cancelled", "unknown"): await db.execute( """INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at) VALUES (?,?,?,?,?,?)""", (1, "surface", state, 100, "op", "2026-05-02T00:00:00+00:00"), ) await db.commit() # Should succeed — no other queued/running row exists. await db.execute( """INSERT INTO burnin_jobs (drive_id, profile, state, percent, operator, created_at) VALUES (?,?,?,?,?,?)""", (1, "surface", "queued", 0, "op", "2026-05-02T00:00:00+00:00"), ) await db.commit() if __name__ == "__main__": unittest.main()