First slice of the planned tech-debt cleanup. burnin.py was 1667 lines and growing; staged extraction gives smaller diffs to review and a clear bisect target if anything regresses. Mechanical move only — no behaviour change. The two extracted modules: * app/burnin/unlock.py — _UnlockGrant, _unlock_grants, PoolMemberError, is_unlocked / unlock_expiry / grant_pool_unlock, plus the four *_TOKEN constants and UNLOCK_TTL_SECONDS. Owns its module-level state; opens its own DB connection in grant_pool_unlock so it doesn't depend on the parent package's _db() helper. * app/burnin/kill.py — _remote_pids dict and the kill_remote_process / set_remote_pid / clear_remote_pid / get_remote_pid helpers. Pulled out of __init__.py so the asyncssh-ignores-signals workaround lives next to the state it operates on. app/burnin/__init__.py re-exports every public symbol the rest of the app imports — `from app import burnin; burnin.start_job(...)`, `burnin.PoolMemberError`, `burnin.UNLOCK_TTL_SECONDS`, etc. all keep working unchanged. Internal aliases `_remote_pids` and `_unlock_grants` on the package root point at the SAME dict objects in the submodules, so existing in-package mutations (set in stages, cleared in cleanup callbacks) work without rewrite. Test fix: tests/test_unlock_flow.py:test_expired_grant_returns_false monkey-patches UNLOCK_TTL_SECONDS. The package-root alias is bound at import time and won't propagate back to the submodule's read site, so the test now patches `app.burnin.unlock.UNLOCK_TTL_SECONDS` directly. Verification: 44/44 unit tests pass in container; /health 200; container boots clean. routes.py, mailer.py, poller.py untouched — the public API is identical. Future: extract stages, task, _common in subsequent versions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
"""Remote process kill machinery.
|
|
|
|
asyncssh's ``proc.kill()`` sends an SSH "signal" channel request that
|
|
OpenSSH's sshd ignores by default — the remote process keeps running and
|
|
``proc.wait()`` hangs forever, pinning the asyncio.Semaphore slot.
|
|
The fix: capture the remote PID at command launch (via the
|
|
``sh -c 'echo PID:$$; exec ...'`` wrapper) and issue ``kill -9 <pid>``
|
|
over a fresh SSH session when we need to abort. This module owns that
|
|
state and the kill helper.
|
|
|
|
Public surface (used by the rest of the burnin package):
|
|
set_remote_pid(job_id, pid) — call from the stage when launch succeeds
|
|
clear_remote_pid(job_id) — call from the cleanup callback
|
|
kill_remote_process(job_id) — fire-and-clear; safe to call repeatedly
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# job_id -> remote PID. Module-level dict so it survives across the
|
|
# stage / task / __init__ split without needing to thread it through
|
|
# function signatures.
|
|
_remote_pids: dict[int, int] = {}
|
|
|
|
|
|
def set_remote_pid(job_id: int, pid: int) -> None:
|
|
"""Record the remote PID captured by the running stage."""
|
|
_remote_pids[job_id] = pid
|
|
|
|
|
|
def clear_remote_pid(job_id: int) -> None:
|
|
"""Drop the PID without trying to kill — used by the task cleanup
|
|
callback so a normally-completed job doesn't carry stale state."""
|
|
_remote_pids.pop(job_id, None)
|
|
|
|
|
|
def get_remote_pid(job_id: int) -> int | None:
|
|
return _remote_pids.get(job_id)
|
|
|
|
|
|
async def kill_remote_process(job_id: int) -> None:
|
|
"""Send kill -9 to the remote PID associated with this job, if any.
|
|
|
|
Idempotent — pops the PID before attempting the kill so a second
|
|
call is a no-op. SSH connection failure is logged but never raised
|
|
(we'd rather best-effort-kill than block the cancel path).
|
|
"""
|
|
pid = _remote_pids.pop(job_id, None)
|
|
if not pid:
|
|
return
|
|
try:
|
|
# Local import to avoid pulling asyncssh into module load if
|
|
# this helper is never used (tests, mock mode).
|
|
from app import ssh_client
|
|
async with await ssh_client._connect() as conn:
|
|
await asyncio.wait_for(
|
|
conn.run(
|
|
f"kill -9 {pid} 2>/dev/null || true", check=False,
|
|
),
|
|
timeout=10,
|
|
)
|
|
log.info("Remote-killed PID %d for job %d", pid, job_id)
|
|
except Exception as exc:
|
|
log.warning(
|
|
"Failed to remote-kill PID %d for job %d: %s", pid, job_id, exc,
|
|
)
|