From 7c3873dd5e5231459b94b10a6f7ea976157fff14 Mon Sep 17 00:00:00 2001 From: Brandon Walter <51866976+echoparkbaby@users.noreply.github.com> Date: Wed, 13 May 2026 10:26:06 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20translate=20badblocks=20\b=20=E2=86=92?= =?UTF-8?q?=20\n=20at=20shell=20level=20(1.0.0-57)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunk-read drain in 1.0.0-55 was supposed to handle badblocks's \b-overwrite progress format but silently never surfaced data — DB bb_phase_pct stayed at 0, log_text stayed at 136 bytes for 26+ hours of running burn-ins. Asyncssh stream.read(4096) behavior on this combination of badblocks output + pipe characteristics wasn't doing what I expected, and gather(return_exceptions=True) swallowed any exception silently. Fix: pipe the badblocks output through `tr '\b' '\n'` at the SHELL level on TrueNAS, before it reaches asyncssh. Every progress update is now a real newline-terminated line by the time we receive it. This also lets us revert to the simpler `async for raw in stream:` drain we had pre-1.0.0-55 — which was proven to work (it caught the PID line and phase-transition headers, just not mid-phase progress). Plus consolidate: 2>&1 merges stderr into stdout before tr, so we only need ONE drain coroutine, not two. Single throttle gate preserved. Recovery: after deploy, the 4 jobs that have been stuck in pipe_w for 26h were autonomously reset via inline SQL and relaunched via POST /api/v1/burnin/start (loopback bypass from 1.0.0-56 made this possible without a session cookie). --- app/burnin/stages.py | 272 ++++++++++++++++++++----------------------- app/config.py | 2 +- 2 files changed, 128 insertions(+), 146 deletions(-) diff --git a/app/burnin/stages.py b/app/burnin/stages.py index 24e9d97..15ad3ca 100644 --- a/app/burnin/stages.py +++ b/app/burnin/stages.py @@ -101,20 +101,34 @@ class _BadblocksProgress: def _build_badblocks_cmd(devname: str) -> str: """Construct the wrapped badblocks command for a given device. - Wraps badblocks under `sh -c 'echo PID:$$; exec ...'` so we can - capture the remote PID for out-of-band kill -9 (asyncssh's signal - channel is ignored by sshd). Geometry (-b -c -p) is operator-tunable - via Settings → Burn-in; defaults match the Spearfoot disk-burnin.sh - recommendation for large HDDs. + badblocks's progress output uses '\\b' backspace characters to + overwrite the previous "XX% done" line — there's no '\\n' between + updates until a phase transition. asyncssh's line-buffered reader + needs a real '\\n' to yield a line, so we pipe the output through + `tr '\\b' '\\n'` at the shell level. After this, every progress + update is a normal newline-terminated line. + + Inner shell does `echo PID:$$; exec badblocks ...` so $$ is the + badblocks PID after exec (needed for out-of-band kill -9; asyncssh's + signal channel is ignored by sshd). 2>&1 merges stderr into stdout + so tr sees the progress lines (badblocks emits them on stderr). + + Geometry (-b -c -p) is operator-tunable via Settings → Burn-in; + defaults match the Spearfoot disk-burnin.sh recommendation. """ - return ( - f"sh -c 'echo PID:$$; exec badblocks " + inner = ( + f"echo PID:$$; exec badblocks " f"-wsv " f"-b {settings.surface_validate_block_size} " f"-c {settings.surface_validate_block_buffer} " f"-p {settings.surface_validate_passes} " - f"/dev/{devname}'" + f"/dev/{devname} 2>&1" ) + # The outer pipeline lets tr translate \\b → \\n. Single quotes + # around the inner script prevent the host shell from interpreting + # $$. The tr argument uses $'\\b' / $'\\n' escapes — POSIX shells + # interpret these as the literal backspace and newline bytes. + return f"sh -c '{inner}' | tr '\\b' '\\n'" from . import kill from ._common import ( @@ -543,152 +557,120 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int) async def _drain(stream, is_stderr: bool): nonlocal bad_blocks_total, pid_seen - # Chunk-read instead of line-iterate. badblocks emits - # progress with '\b' backspaces (and sometimes '\r') - # to overwrite the previous progress line in-place — - # there's no '\n' between updates until a phase - # transition. async-for-line would buffer the entire - # phase's output as ONE line, so the parser never - # sees mid-phase percent updates. We read raw chunks, - # normalize \b runs + \r to \n, then process each - # resulting fragment as a line. Keep the partial - # trailing fragment in buf for the next chunk. - buf = "" - while True: - chunk = await stream.read(4096) - if not chunk: - break - if isinstance(chunk, bytes): - chunk = chunk.decode("utf-8", errors="replace") - buf += chunk - # Normalize \b runs + lone \r to \n so split() works. - normalized = _re_pre.sub(r"[\b]+", "\n", buf) - normalized = normalized.replace("\r", "\n") - fragments = normalized.split("\n") - # The tail is incomplete — keep it for the next chunk. - buf = fragments[-1] - for fragment in fragments[:-1]: - line = fragment + "\n" - if not line.strip(): - continue + # Line-based drain. The wrapped badblocks command + # pipes through `tr '\b' '\n'` at the shell level + # so every progress update is a real newline- + # terminated line by the time it reaches us. + async for raw in stream: + line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace") + if not line.strip(): + continue - # First stdout line is "PID:" from the - # wrapping shell. Capture and skip. - if not is_stderr and not pid_seen and line.startswith("PID:"): - pid_seen = True - try: - kill.set_remote_pid(job_id, int(line[4:].strip())) - log.info( - "Captured remote PID %d for job %d (badblocks)", - kill.get_remote_pid(job_id), job_id, - ) - except ValueError: - pass - continue - - # Drive progress.update from EVERY stderr line - # so it picks up "Testing with pattern 0xXX" + - # "Reading and comparing" headers, not just - # the percent-done lines. CPU-bound regex. - prev_phase = progress.phase - phase_changed = False - is_progress_line = False - if is_stderr: - progress.update(line) - phase_changed = progress.phase != prev_phase - is_progress_line = bool(_BB_PERCENT_RE.search(line)) - else: - stripped = line.strip() - if stripped and stripped.isdigit(): - bad_blocks_total += 1 - - # Keep the "XX% done" lines OUT of output_lines. - # They're the dominant volume; log_text concat - # is quadratic. - if not is_progress_line: - output_lines.append(line) - - # Single throttle gate covering EVERY DB touch. - # Without this, the cumulative DB load makes - # the asyncssh drain fall behind, the SSH - # window stops advancing, sshd stops reading - # the pipe, badblocks blocks on pipe_write - # and no disk I/O happens (sectors_written - # delta of 0 confirmed the symptom). - now_ts = time.monotonic() - time_since_last_db = now_ts - last_db_write_ts - should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS - - if should_write: - # 1) Cancellation check (was per-line) - if await _is_cancelled(job_id): - await kill.kill_remote_process(job_id) - return - - # 2) Per-pattern progress + history - if phase_changed: - await _record_bb_phase_start( - job_id, "surface_validate", - progress.phase, _now(), - ) - await _update_stage_percent( - job_id, "surface_validate", progress.overall_pct, - ) - await _update_stage_bb_phase( - job_id, "surface_validate", - progress.phase, progress.phase_pct, - ) - await _update_stage_bad_blocks( - job_id, "surface_validate", bad_blocks_total, + # First stdout line is "PID:" from the + # wrapping shell. Capture and skip. + if not is_stderr and not pid_seen and line.startswith("PID:"): + pid_seen = True + try: + kill.set_remote_pid(job_id, int(line[4:].strip())) + log.info( + "Captured remote PID %d for job %d (badblocks)", + kill.get_remote_pid(job_id), job_id, ) + except ValueError: + pass + continue - # 3) Throughput. Skip phase transitions - # (per-phase pct resets → negative delta). - if ( - drive_size_bytes - and not phase_changed - and progress.overall_pct > last_pct_sample - and time_since_last_db >= 1.0 - ): - d_pct = progress.overall_pct - last_pct_sample - bytes_done = (d_pct / 800.0) * drive_size_bytes - mbps = bytes_done / time_since_last_db / 1_000_000 - await _update_stage_bb_mbps( - job_id, "surface_validate", mbps, - ) + # Note: with the `tr` pipe, badblocks's stderr is + # merged into stdout (`2>&1`). is_stderr is now + # always False — we treat every non-PID line as + # potentially containing progress or bad-block + # output. The phase parser is idempotent on + # unrelated lines. + prev_phase = progress.phase + progress.update(line) + phase_changed = progress.phase != prev_phase + is_progress_line = bool(_BB_PERCENT_RE.search(line)) + # Bare-number lines from badblocks are bad-block + # block numbers (one per line on stdout). + stripped = line.strip() + if stripped and stripped.isdigit() and not is_progress_line: + bad_blocks_total += 1 - # 4) Log flush. - if pending_log_chunks: - chunk = "".join(pending_log_chunks) - pending_log_chunks.clear() - await _append_stage_log( - job_id, "surface_validate", chunk, - ) + # Keep "XX% done" lines OUT of output_lines. Big + # volume + quadratic log_text concat. + if not is_progress_line: + output_lines.append(line) - last_pct_sample = progress.overall_pct - last_db_write_ts = now_ts - await _recalculate_progress(job_id) - _push_update() + # Single throttle gate covering EVERY DB touch. + # Cumulative DB load otherwise overwhelms the + # asyncio loop → asyncssh drain falls behind → + # SSH window stops advancing → pipe fills → + # badblocks blocks on pipe_write → disk I/O stops. + now_ts = time.monotonic() + time_since_last_db = now_ts - last_db_write_ts + should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS - # Buffer non-progress lines for the next flush. - if not is_progress_line: - pending_log_chunks.append(line) - - # Abort on bad block threshold (immediate, not - # throttled — finding bad blocks is urgent). - if bad_blocks_total > settings.bad_block_threshold: + if should_write: + if await _is_cancelled(job_id): await kill.kill_remote_process(job_id) - output_lines.append( - f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded " - f"threshold ({settings.bad_block_threshold})\n" - ) return - await asyncio.gather( - _drain(proc.stdout, False), - _drain(proc.stderr, True), - return_exceptions=True, - ) + if phase_changed: + await _record_bb_phase_start( + job_id, "surface_validate", + progress.phase, _now(), + ) + await _update_stage_percent( + job_id, "surface_validate", progress.overall_pct, + ) + await _update_stage_bb_phase( + job_id, "surface_validate", + progress.phase, progress.phase_pct, + ) + await _update_stage_bad_blocks( + job_id, "surface_validate", bad_blocks_total, + ) + + if ( + drive_size_bytes + and not phase_changed + and progress.overall_pct > last_pct_sample + and time_since_last_db >= 1.0 + ): + d_pct = progress.overall_pct - last_pct_sample + bytes_done = (d_pct / 800.0) * drive_size_bytes + mbps = bytes_done / time_since_last_db / 1_000_000 + await _update_stage_bb_mbps( + job_id, "surface_validate", mbps, + ) + + if pending_log_chunks: + chunk = "".join(pending_log_chunks) + pending_log_chunks.clear() + await _append_stage_log( + job_id, "surface_validate", chunk, + ) + + last_pct_sample = progress.overall_pct + last_db_write_ts = now_ts + await _recalculate_progress(job_id) + _push_update() + + if not is_progress_line: + pending_log_chunks.append(line) + + # Abort on bad block threshold — immediate. + if bad_blocks_total > settings.bad_block_threshold: + await kill.kill_remote_process(job_id) + output_lines.append( + f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded " + f"threshold ({settings.bad_block_threshold})\n" + ) + return + + # Single stream now — the `2>&1` in _build_badblocks_cmd + # merges stderr into stdout before the `tr` pipe. + await _drain(proc.stdout, False) # Bound proc.wait so a remote process that ignored our kill # signal (or that we never managed to kill) can't pin this # task in the semaphore forever. Closing the connection on diff --git a/app/config.py b/app/config.py index 40f6e93..df93830 100644 --- a/app/config.py +++ b/app/config.py @@ -86,7 +86,7 @@ class Settings(BaseSettings): ssh_key: str = "" # PEM private key content (paste full key including headers) # Application version — used by the /api/v1/updates/check endpoint - app_version: str = "1.0.0-56" + app_version: str = "1.0.0-57" # ---- Authentication (1.0.0-22) ---- # session_secret: HMAC key for signing session cookies. Empty = generate