From f71ae341f52eed82d20859bd1a14a46e804469dd Mon Sep 17 00:00:00 2001 From: Brandon Walter <51866976+echoparkbaby@users.noreply.github.com> Date: Tue, 12 May 2026 07:53:33 -0700 Subject: [PATCH] fix: backport stages.py \b-parser fix + drawer-finish inline (uncommitted from 1.0.0-55) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunk-read parser fix that ships as part of 1.0.0-55 in the running container was scp'd to maple but never reached git. Same for the drawer-job-finish margin-left removal (request: pill sits inline next to operator/date, not flush right). Reconciling source with deployed state. No new behaviour — git now matches what's been live on maple since 1.0.0-55. --- app/burnin/stages.py | 248 +++++++++++++++++++++++-------------------- app/static/app.css | 1 - 2 files changed, 133 insertions(+), 116 deletions(-) diff --git a/app/burnin/stages.py b/app/burnin/stages.py index f7c4b2c..24e9d97 100644 --- a/app/burnin/stages.py +++ b/app/burnin/stages.py @@ -543,128 +543,146 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int) async def _drain(stream, is_stderr: bool): nonlocal bad_blocks_total, pid_seen - async for raw in stream: - line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace") + # Chunk-read instead of line-iterate. badblocks emits + # progress with '\b' backspaces (and sometimes '\r') + # to overwrite the previous progress line in-place — + # there's no '\n' between updates until a phase + # transition. async-for-line would buffer the entire + # phase's output as ONE line, so the parser never + # sees mid-phase percent updates. We read raw chunks, + # normalize \b runs + \r to \n, then process each + # resulting fragment as a line. Keep the partial + # trailing fragment in buf for the next chunk. + buf = "" + while True: + chunk = await stream.read(4096) + if not chunk: + break + if isinstance(chunk, bytes): + chunk = chunk.decode("utf-8", errors="replace") + buf += chunk + # Normalize \b runs + lone \r to \n so split() works. + normalized = _re_pre.sub(r"[\b]+", "\n", buf) + normalized = normalized.replace("\r", "\n") + fragments = normalized.split("\n") + # The tail is incomplete — keep it for the next chunk. + buf = fragments[-1] + for fragment in fragments[:-1]: + line = fragment + "\n" + if not line.strip(): + continue - # First stdout line is "PID:" from the wrapping shell. - # Capture it and don't append it to the user-visible log. - if not is_stderr and not pid_seen and line.startswith("PID:"): - pid_seen = True - try: - kill.set_remote_pid(job_id, int(line[4:].strip())) - log.info( - "Captured remote PID %d for job %d (badblocks)", - kill.get_remote_pid(job_id), job_id, + # First stdout line is "PID:" from the + # wrapping shell. Capture and skip. + if not is_stderr and not pid_seen and line.startswith("PID:"): + pid_seen = True + try: + kill.set_remote_pid(job_id, int(line[4:].strip())) + log.info( + "Captured remote PID %d for job %d (badblocks)", + kill.get_remote_pid(job_id), job_id, + ) + except ValueError: + pass + continue + + # Drive progress.update from EVERY stderr line + # so it picks up "Testing with pattern 0xXX" + + # "Reading and comparing" headers, not just + # the percent-done lines. CPU-bound regex. + prev_phase = progress.phase + phase_changed = False + is_progress_line = False + if is_stderr: + progress.update(line) + phase_changed = progress.phase != prev_phase + is_progress_line = bool(_BB_PERCENT_RE.search(line)) + else: + stripped = line.strip() + if stripped and stripped.isdigit(): + bad_blocks_total += 1 + + # Keep the "XX% done" lines OUT of output_lines. + # They're the dominant volume; log_text concat + # is quadratic. + if not is_progress_line: + output_lines.append(line) + + # Single throttle gate covering EVERY DB touch. + # Without this, the cumulative DB load makes + # the asyncssh drain fall behind, the SSH + # window stops advancing, sshd stops reading + # the pipe, badblocks blocks on pipe_write + # and no disk I/O happens (sectors_written + # delta of 0 confirmed the symptom). + now_ts = time.monotonic() + time_since_last_db = now_ts - last_db_write_ts + should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS + + if should_write: + # 1) Cancellation check (was per-line) + if await _is_cancelled(job_id): + await kill.kill_remote_process(job_id) + return + + # 2) Per-pattern progress + history + if phase_changed: + await _record_bb_phase_start( + job_id, "surface_validate", + progress.phase, _now(), + ) + await _update_stage_percent( + job_id, "surface_validate", progress.overall_pct, ) - except ValueError: - pass - continue - - # Drive progress.update from EVERY stderr line so it - # picks up the "Testing with pattern 0xXX" + "Reading - # and comparing" headers (which advance the phase - # counter), not just the percent-done lines. CPU- - # bound regex work, no I/O, safe to do unconditionally. - prev_phase = progress.phase - phase_changed = False - is_progress_line = False - if is_stderr: - progress.update(line) - phase_changed = progress.phase != prev_phase - is_progress_line = bool(_BB_PERCENT_RE.search(line)) - else: - stripped = line.strip() - if stripped and stripped.isdigit(): - bad_blocks_total += 1 - - # Keep the "XX% done" lines OUT of output_lines. - # They're the dominant volume (sub-second cadence - # for hours) and the log_text concat is quadratic - # in length. Headers, errors, bad-block numbers, - # and other diagnostic output still get logged. - if not is_progress_line: - output_lines.append(line) - - # Single throttle gate covering EVERY DB touch in - # this loop. Without this, _is_cancelled + percent - # writes + chunk-log appends collectively overwhelm - # the asyncio loop, the asyncssh drain falls behind, - # the SSH window stops advancing, sshd stops reading - # the pipe, badblocks blocks on pipe_write and no - # disk I/O happens. Symptom: badblocks pid in - # wchan=pipe_write with /sys/block sectors_written - # delta of 0. - now_ts = time.monotonic() - time_since_last_db = now_ts - last_db_write_ts - should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS - - if should_write: - # 1) Cancellation check (was per-line) - if await _is_cancelled(job_id): - await kill.kill_remote_process(job_id) - return - - # 2) Per-pattern progress + history - if phase_changed: - await _record_bb_phase_start( + await _update_stage_bb_phase( job_id, "surface_validate", - progress.phase, _now(), + progress.phase, progress.phase_pct, ) - await _update_stage_percent( - job_id, "surface_validate", progress.overall_pct, - ) - await _update_stage_bb_phase( - job_id, "surface_validate", - progress.phase, progress.phase_pct, - ) - await _update_stage_bad_blocks( - job_id, "surface_validate", bad_blocks_total, - ) - - # 3) Throughput. Skip phase transitions (per- - # phase pct resets would yield a negative delta). - if ( - drive_size_bytes - and not phase_changed - and progress.overall_pct > last_pct_sample - and time_since_last_db >= 1.0 - ): - d_pct = progress.overall_pct - last_pct_sample - bytes_done = (d_pct / 800.0) * drive_size_bytes - mbps = bytes_done / time_since_last_db / 1_000_000 - await _update_stage_bb_mbps( - job_id, "surface_validate", mbps, + await _update_stage_bad_blocks( + job_id, "surface_validate", bad_blocks_total, ) - # 4) Log flush. Chunk what's accumulated since - # the last write. log_text concat is quadratic - # so the volume reduction from skipping "% done" - # lines above matters MORE than the throttle. - if pending_log_chunks: - chunk = "".join(pending_log_chunks) - pending_log_chunks.clear() - await _append_stage_log( - job_id, "surface_validate", chunk, + # 3) Throughput. Skip phase transitions + # (per-phase pct resets → negative delta). + if ( + drive_size_bytes + and not phase_changed + and progress.overall_pct > last_pct_sample + and time_since_last_db >= 1.0 + ): + d_pct = progress.overall_pct - last_pct_sample + bytes_done = (d_pct / 800.0) * drive_size_bytes + mbps = bytes_done / time_since_last_db / 1_000_000 + await _update_stage_bb_mbps( + job_id, "surface_validate", mbps, + ) + + # 4) Log flush. + if pending_log_chunks: + chunk = "".join(pending_log_chunks) + pending_log_chunks.clear() + await _append_stage_log( + job_id, "surface_validate", chunk, + ) + + last_pct_sample = progress.overall_pct + last_db_write_ts = now_ts + await _recalculate_progress(job_id) + _push_update() + + # Buffer non-progress lines for the next flush. + if not is_progress_line: + pending_log_chunks.append(line) + + # Abort on bad block threshold (immediate, not + # throttled — finding bad blocks is urgent). + if bad_blocks_total > settings.bad_block_threshold: + await kill.kill_remote_process(job_id) + output_lines.append( + f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded " + f"threshold ({settings.bad_block_threshold})\n" ) - - last_pct_sample = progress.overall_pct - last_db_write_ts = now_ts - await _recalculate_progress(job_id) - _push_update() - - # Buffer non-progress lines for the next flush. - if not is_progress_line: - pending_log_chunks.append(line) - - # Abort on bad block threshold (immediate, not - # throttled — finding bad blocks is urgent). - if bad_blocks_total > settings.bad_block_threshold: - await kill.kill_remote_process(job_id) - output_lines.append( - f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded " - f"threshold ({settings.bad_block_threshold})\n" - ) - return + return await asyncio.gather( _drain(proc.stdout, False), diff --git a/app/static/app.css b/app/static/app.css index c1f30f5..9a79e84 100644 --- a/app/static/app.css +++ b/app/static/app.css @@ -2951,7 +2951,6 @@ th.sort-desc::after { flex-wrap: wrap; } .drawer-job-finish { - margin-left: auto; display: inline-flex; align-items: baseline; gap: 8px;