From 149f2901b7dc751c1c2626b847966531bd6f3c61 Mon Sep 17 00:00:00 2001 From: Brandon Walter <51866976+echoparkbaby@users.noreply.github.com> Date: Mon, 11 May 2026 22:07:39 -0700 Subject: [PATCH] fix: throttle ALL drain-loop DB calls + drop progress noise from log (1.0.0-54) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.0.0-52 throttled the percent/bb_phase writes but missed: - `_is_cancelled` ran a DB query on EVERY stderr line (sub-second cadence × 4 concurrent burn-ins = ~10+ DB connection opens/s) - `_append_stage_log` ran every 20 output_lines (~once per second) doing a quadratic `log_text || ?` concat that gets multi-MB rewrites as the log grows - `_recalculate_progress` + `_push_update` also fired per gated tick Cumulative load kept the asyncssh drain coroutine too busy to consume the SSH channel buffer; SSH window stalled; sshd stopped reading the pipe; badblocks blocked on pipe_write with state=S wchan=pipe_write. /sys/block sectors_written delta confirmed 0 disk I/O across all running drives despite 23h elapsed. Fix: 1. Single throttle gate (BB_DB_MIN_SECONDS=5s) covers EVERY DB touch in the drain — cancel check, percent/phase/bb_count updates, throughput sample, log flush, recalc, SSE push. Phase transitions still bypass the throttle (rare + important). 2. Exclude "XX% done" lines from the log entirely. They were the dominant volume; meaningful content (pattern headers, errors, bad-block numbers) still gets logged via the throttled flush. 3. log_text concat still quadratic but the volume reduction makes it tractable — buffer to pending_log_chunks, flush on the gate. Net effect: ~99% reduction in drain-loop DB load. asyncssh drain keeps up; pipe drains; badblocks writes; disk goes brr. --- app/burnin/stages.py | 161 ++++++++++++++++++++++++------------------- app/config.py | 2 +- 2 files changed, 93 insertions(+), 70 deletions(-) diff --git a/app/burnin/stages.py b/app/burnin/stages.py index a261765..f7c4b2c 100644 --- a/app/burnin/stages.py +++ b/app/burnin/stages.py @@ -521,6 +521,9 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int) # from the delta on each new sample. last_pct_sample: float = progress.overall_pct last_db_write_ts: float = time.monotonic() + # Lines accumulated since last log flush. Flushed in the + # throttled DB-write window (see BB_DB_MIN_SECONDS). + pending_log_chunks: list[str] = [] # Seed bb_phase=1, bb_phase_pct=0 immediately so the # drawer's per-pattern meters have something to render @@ -557,80 +560,104 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int) pass continue - output_lines.append(line) - + # Drive progress.update from EVERY stderr line so it + # picks up the "Testing with pattern 0xXX" + "Reading + # and comparing" headers (which advance the phase + # counter), not just the percent-done lines. CPU- + # bound regex work, no I/O, safe to do unconditionally. + prev_phase = progress.phase + phase_changed = False + is_progress_line = False if is_stderr: - # Drive progress.update from EVERY stderr line so it - # picks up the "Testing with pattern 0xXX" + "Reading - # and comparing" headers (which advance the phase - # counter), not just the percent-done lines. - prev_phase = progress.phase progress.update(line) phase_changed = progress.phase != prev_phase - - # Throttle DB writes. Each progress line used to - # trigger 4-6 transactions; with 4 concurrent - # burn-ins and sub-second progress lines that - # backs up the asyncssh drain → pipe fills → - # badblocks blocks on pipe_write → no disk I/O. - # Commit at most once every BB_DB_MIN_SECONDS, - # plus always on phase transitions (transitions - # are rare AND important: they stamp history, - # advance the meter strip). - now_ts = time.monotonic() - time_since_last_db = now_ts - last_db_write_ts - should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS - - if should_write: - if phase_changed: - # Stamp the moment we first see this - # phase so the drawer can show - # per-pattern elapsed times. - await _record_bb_phase_start( - job_id, "surface_validate", - progress.phase, _now(), - ) - await _update_stage_percent( - job_id, "surface_validate", progress.overall_pct, - ) - await _update_stage_bb_phase( - job_id, "surface_validate", - progress.phase, progress.phase_pct, - ) - await _update_stage_bad_blocks( - job_id, "surface_validate", bad_blocks_total, - ) - # Throughput: compute since the last DB - # write, not since the last progress line. - # Skip phase transitions (per-phase pct - # resets would yield a negative delta). - if ( - drive_size_bytes - and not phase_changed - and progress.overall_pct > last_pct_sample - and time_since_last_db >= 1.0 - ): - d_pct = progress.overall_pct - last_pct_sample - bytes_done = (d_pct / 800.0) * drive_size_bytes - mbps = bytes_done / time_since_last_db / 1_000_000 - await _update_stage_bb_mbps( - job_id, "surface_validate", mbps, - ) - last_pct_sample = progress.overall_pct - last_db_write_ts = now_ts - await _recalculate_progress(job_id) - _push_update() + is_progress_line = bool(_BB_PERCENT_RE.search(line)) else: stripped = line.strip() if stripped and stripped.isdigit(): bad_blocks_total += 1 - # Append to DB log in chunks - if len(output_lines) % 20 == 0: - chunk = "".join(output_lines[-20:]) - await _append_stage_log(job_id, "surface_validate", chunk) + # Keep the "XX% done" lines OUT of output_lines. + # They're the dominant volume (sub-second cadence + # for hours) and the log_text concat is quadratic + # in length. Headers, errors, bad-block numbers, + # and other diagnostic output still get logged. + if not is_progress_line: + output_lines.append(line) - # Abort on bad block threshold + # Single throttle gate covering EVERY DB touch in + # this loop. Without this, _is_cancelled + percent + # writes + chunk-log appends collectively overwhelm + # the asyncio loop, the asyncssh drain falls behind, + # the SSH window stops advancing, sshd stops reading + # the pipe, badblocks blocks on pipe_write and no + # disk I/O happens. Symptom: badblocks pid in + # wchan=pipe_write with /sys/block sectors_written + # delta of 0. + now_ts = time.monotonic() + time_since_last_db = now_ts - last_db_write_ts + should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS + + if should_write: + # 1) Cancellation check (was per-line) + if await _is_cancelled(job_id): + await kill.kill_remote_process(job_id) + return + + # 2) Per-pattern progress + history + if phase_changed: + await _record_bb_phase_start( + job_id, "surface_validate", + progress.phase, _now(), + ) + await _update_stage_percent( + job_id, "surface_validate", progress.overall_pct, + ) + await _update_stage_bb_phase( + job_id, "surface_validate", + progress.phase, progress.phase_pct, + ) + await _update_stage_bad_blocks( + job_id, "surface_validate", bad_blocks_total, + ) + + # 3) Throughput. Skip phase transitions (per- + # phase pct resets would yield a negative delta). + if ( + drive_size_bytes + and not phase_changed + and progress.overall_pct > last_pct_sample + and time_since_last_db >= 1.0 + ): + d_pct = progress.overall_pct - last_pct_sample + bytes_done = (d_pct / 800.0) * drive_size_bytes + mbps = bytes_done / time_since_last_db / 1_000_000 + await _update_stage_bb_mbps( + job_id, "surface_validate", mbps, + ) + + # 4) Log flush. Chunk what's accumulated since + # the last write. log_text concat is quadratic + # so the volume reduction from skipping "% done" + # lines above matters MORE than the throttle. + if pending_log_chunks: + chunk = "".join(pending_log_chunks) + pending_log_chunks.clear() + await _append_stage_log( + job_id, "surface_validate", chunk, + ) + + last_pct_sample = progress.overall_pct + last_db_write_ts = now_ts + await _recalculate_progress(job_id) + _push_update() + + # Buffer non-progress lines for the next flush. + if not is_progress_line: + pending_log_chunks.append(line) + + # Abort on bad block threshold (immediate, not + # throttled — finding bad blocks is urgent). if bad_blocks_total > settings.bad_block_threshold: await kill.kill_remote_process(job_id) output_lines.append( @@ -639,10 +666,6 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int) ) return - if await _is_cancelled(job_id): - await kill.kill_remote_process(job_id) - return - await asyncio.gather( _drain(proc.stdout, False), _drain(proc.stderr, True), diff --git a/app/config.py b/app/config.py index d172ce2..f7dec05 100644 --- a/app/config.py +++ b/app/config.py @@ -86,7 +86,7 @@ class Settings(BaseSettings): ssh_key: str = "" # PEM private key content (paste full key including headers) # Application version — used by the /api/v1/updates/check endpoint - app_version: str = "1.0.0-53" + app_version: str = "1.0.0-54" # ---- Authentication (1.0.0-22) ---- # session_secret: HMAC key for signing session cookies. Empty = generate