fix: translate badblocks \b → \n at shell level (1.0.0-57)
The chunk-read drain in 1.0.0-55 was supposed to handle badblocks's \b-overwrite progress format but silently never surfaced data — DB bb_phase_pct stayed at 0, log_text stayed at 136 bytes for 26+ hours of running burn-ins. Asyncssh stream.read(4096) behavior on this combination of badblocks output + pipe characteristics wasn't doing what I expected, and gather(return_exceptions=True) swallowed any exception silently. Fix: pipe the badblocks output through `tr '\b' '\n'` at the SHELL level on TrueNAS, before it reaches asyncssh. Every progress update is now a real newline-terminated line by the time we receive it. This also lets us revert to the simpler `async for raw in stream:` drain we had pre-1.0.0-55 — which was proven to work (it caught the PID line and phase-transition headers, just not mid-phase progress). Plus consolidate: 2>&1 merges stderr into stdout before tr, so we only need ONE drain coroutine, not two. Single throttle gate preserved. Recovery: after deploy, the 4 jobs that have been stuck in pipe_w for 26h were autonomously reset via inline SQL and relaunched via POST /api/v1/burnin/start (loopback bypass from 1.0.0-56 made this possible without a session cookie).
This commit is contained in:
parent
f71ae341f5
commit
7c3873dd5e
2 changed files with 128 additions and 146 deletions
|
|
@ -101,20 +101,34 @@ class _BadblocksProgress:
|
||||||
def _build_badblocks_cmd(devname: str) -> str:
|
def _build_badblocks_cmd(devname: str) -> str:
|
||||||
"""Construct the wrapped badblocks command for a given device.
|
"""Construct the wrapped badblocks command for a given device.
|
||||||
|
|
||||||
Wraps badblocks under `sh -c 'echo PID:$$; exec ...'` so we can
|
badblocks's progress output uses '\\b' backspace characters to
|
||||||
capture the remote PID for out-of-band kill -9 (asyncssh's signal
|
overwrite the previous "XX% done" line — there's no '\\n' between
|
||||||
channel is ignored by sshd). Geometry (-b -c -p) is operator-tunable
|
updates until a phase transition. asyncssh's line-buffered reader
|
||||||
via Settings → Burn-in; defaults match the Spearfoot disk-burnin.sh
|
needs a real '\\n' to yield a line, so we pipe the output through
|
||||||
recommendation for large HDDs.
|
`tr '\\b' '\\n'` at the shell level. After this, every progress
|
||||||
|
update is a normal newline-terminated line.
|
||||||
|
|
||||||
|
Inner shell does `echo PID:$$; exec badblocks ...` so $$ is the
|
||||||
|
badblocks PID after exec (needed for out-of-band kill -9; asyncssh's
|
||||||
|
signal channel is ignored by sshd). 2>&1 merges stderr into stdout
|
||||||
|
so tr sees the progress lines (badblocks emits them on stderr).
|
||||||
|
|
||||||
|
Geometry (-b -c -p) is operator-tunable via Settings → Burn-in;
|
||||||
|
defaults match the Spearfoot disk-burnin.sh recommendation.
|
||||||
"""
|
"""
|
||||||
return (
|
inner = (
|
||||||
f"sh -c 'echo PID:$$; exec badblocks "
|
f"echo PID:$$; exec badblocks "
|
||||||
f"-wsv "
|
f"-wsv "
|
||||||
f"-b {settings.surface_validate_block_size} "
|
f"-b {settings.surface_validate_block_size} "
|
||||||
f"-c {settings.surface_validate_block_buffer} "
|
f"-c {settings.surface_validate_block_buffer} "
|
||||||
f"-p {settings.surface_validate_passes} "
|
f"-p {settings.surface_validate_passes} "
|
||||||
f"/dev/{devname}'"
|
f"/dev/{devname} 2>&1"
|
||||||
)
|
)
|
||||||
|
# The outer pipeline lets tr translate \\b → \\n. Single quotes
|
||||||
|
# around the inner script prevent the host shell from interpreting
|
||||||
|
# $$. The tr argument uses $'\\b' / $'\\n' escapes — POSIX shells
|
||||||
|
# interpret these as the literal backspace and newline bytes.
|
||||||
|
return f"sh -c '{inner}' | tr '\\b' '\\n'"
|
||||||
|
|
||||||
from . import kill
|
from . import kill
|
||||||
from ._common import (
|
from ._common import (
|
||||||
|
|
@ -543,152 +557,120 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int)
|
||||||
|
|
||||||
async def _drain(stream, is_stderr: bool):
|
async def _drain(stream, is_stderr: bool):
|
||||||
nonlocal bad_blocks_total, pid_seen
|
nonlocal bad_blocks_total, pid_seen
|
||||||
# Chunk-read instead of line-iterate. badblocks emits
|
# Line-based drain. The wrapped badblocks command
|
||||||
# progress with '\b' backspaces (and sometimes '\r')
|
# pipes through `tr '\b' '\n'` at the shell level
|
||||||
# to overwrite the previous progress line in-place —
|
# so every progress update is a real newline-
|
||||||
# there's no '\n' between updates until a phase
|
# terminated line by the time it reaches us.
|
||||||
# transition. async-for-line would buffer the entire
|
async for raw in stream:
|
||||||
# phase's output as ONE line, so the parser never
|
line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace")
|
||||||
# sees mid-phase percent updates. We read raw chunks,
|
if not line.strip():
|
||||||
# normalize \b runs + \r to \n, then process each
|
continue
|
||||||
# resulting fragment as a line. Keep the partial
|
|
||||||
# trailing fragment in buf for the next chunk.
|
|
||||||
buf = ""
|
|
||||||
while True:
|
|
||||||
chunk = await stream.read(4096)
|
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
if isinstance(chunk, bytes):
|
|
||||||
chunk = chunk.decode("utf-8", errors="replace")
|
|
||||||
buf += chunk
|
|
||||||
# Normalize \b runs + lone \r to \n so split() works.
|
|
||||||
normalized = _re_pre.sub(r"[\b]+", "\n", buf)
|
|
||||||
normalized = normalized.replace("\r", "\n")
|
|
||||||
fragments = normalized.split("\n")
|
|
||||||
# The tail is incomplete — keep it for the next chunk.
|
|
||||||
buf = fragments[-1]
|
|
||||||
for fragment in fragments[:-1]:
|
|
||||||
line = fragment + "\n"
|
|
||||||
if not line.strip():
|
|
||||||
continue
|
|
||||||
|
|
||||||
# First stdout line is "PID:<n>" from the
|
# First stdout line is "PID:<n>" from the
|
||||||
# wrapping shell. Capture and skip.
|
# wrapping shell. Capture and skip.
|
||||||
if not is_stderr and not pid_seen and line.startswith("PID:"):
|
if not is_stderr and not pid_seen and line.startswith("PID:"):
|
||||||
pid_seen = True
|
pid_seen = True
|
||||||
try:
|
try:
|
||||||
kill.set_remote_pid(job_id, int(line[4:].strip()))
|
kill.set_remote_pid(job_id, int(line[4:].strip()))
|
||||||
log.info(
|
log.info(
|
||||||
"Captured remote PID %d for job %d (badblocks)",
|
"Captured remote PID %d for job %d (badblocks)",
|
||||||
kill.get_remote_pid(job_id), job_id,
|
kill.get_remote_pid(job_id), job_id,
|
||||||
)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Drive progress.update from EVERY stderr line
|
|
||||||
# so it picks up "Testing with pattern 0xXX" +
|
|
||||||
# "Reading and comparing" headers, not just
|
|
||||||
# the percent-done lines. CPU-bound regex.
|
|
||||||
prev_phase = progress.phase
|
|
||||||
phase_changed = False
|
|
||||||
is_progress_line = False
|
|
||||||
if is_stderr:
|
|
||||||
progress.update(line)
|
|
||||||
phase_changed = progress.phase != prev_phase
|
|
||||||
is_progress_line = bool(_BB_PERCENT_RE.search(line))
|
|
||||||
else:
|
|
||||||
stripped = line.strip()
|
|
||||||
if stripped and stripped.isdigit():
|
|
||||||
bad_blocks_total += 1
|
|
||||||
|
|
||||||
# Keep the "XX% done" lines OUT of output_lines.
|
|
||||||
# They're the dominant volume; log_text concat
|
|
||||||
# is quadratic.
|
|
||||||
if not is_progress_line:
|
|
||||||
output_lines.append(line)
|
|
||||||
|
|
||||||
# Single throttle gate covering EVERY DB touch.
|
|
||||||
# Without this, the cumulative DB load makes
|
|
||||||
# the asyncssh drain fall behind, the SSH
|
|
||||||
# window stops advancing, sshd stops reading
|
|
||||||
# the pipe, badblocks blocks on pipe_write
|
|
||||||
# and no disk I/O happens (sectors_written
|
|
||||||
# delta of 0 confirmed the symptom).
|
|
||||||
now_ts = time.monotonic()
|
|
||||||
time_since_last_db = now_ts - last_db_write_ts
|
|
||||||
should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS
|
|
||||||
|
|
||||||
if should_write:
|
|
||||||
# 1) Cancellation check (was per-line)
|
|
||||||
if await _is_cancelled(job_id):
|
|
||||||
await kill.kill_remote_process(job_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
# 2) Per-pattern progress + history
|
|
||||||
if phase_changed:
|
|
||||||
await _record_bb_phase_start(
|
|
||||||
job_id, "surface_validate",
|
|
||||||
progress.phase, _now(),
|
|
||||||
)
|
|
||||||
await _update_stage_percent(
|
|
||||||
job_id, "surface_validate", progress.overall_pct,
|
|
||||||
)
|
|
||||||
await _update_stage_bb_phase(
|
|
||||||
job_id, "surface_validate",
|
|
||||||
progress.phase, progress.phase_pct,
|
|
||||||
)
|
|
||||||
await _update_stage_bad_blocks(
|
|
||||||
job_id, "surface_validate", bad_blocks_total,
|
|
||||||
)
|
)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
continue
|
||||||
|
|
||||||
# 3) Throughput. Skip phase transitions
|
# Note: with the `tr` pipe, badblocks's stderr is
|
||||||
# (per-phase pct resets → negative delta).
|
# merged into stdout (`2>&1`). is_stderr is now
|
||||||
if (
|
# always False — we treat every non-PID line as
|
||||||
drive_size_bytes
|
# potentially containing progress or bad-block
|
||||||
and not phase_changed
|
# output. The phase parser is idempotent on
|
||||||
and progress.overall_pct > last_pct_sample
|
# unrelated lines.
|
||||||
and time_since_last_db >= 1.0
|
prev_phase = progress.phase
|
||||||
):
|
progress.update(line)
|
||||||
d_pct = progress.overall_pct - last_pct_sample
|
phase_changed = progress.phase != prev_phase
|
||||||
bytes_done = (d_pct / 800.0) * drive_size_bytes
|
is_progress_line = bool(_BB_PERCENT_RE.search(line))
|
||||||
mbps = bytes_done / time_since_last_db / 1_000_000
|
# Bare-number lines from badblocks are bad-block
|
||||||
await _update_stage_bb_mbps(
|
# block numbers (one per line on stdout).
|
||||||
job_id, "surface_validate", mbps,
|
stripped = line.strip()
|
||||||
)
|
if stripped and stripped.isdigit() and not is_progress_line:
|
||||||
|
bad_blocks_total += 1
|
||||||
|
|
||||||
# 4) Log flush.
|
# Keep "XX% done" lines OUT of output_lines. Big
|
||||||
if pending_log_chunks:
|
# volume + quadratic log_text concat.
|
||||||
chunk = "".join(pending_log_chunks)
|
if not is_progress_line:
|
||||||
pending_log_chunks.clear()
|
output_lines.append(line)
|
||||||
await _append_stage_log(
|
|
||||||
job_id, "surface_validate", chunk,
|
|
||||||
)
|
|
||||||
|
|
||||||
last_pct_sample = progress.overall_pct
|
# Single throttle gate covering EVERY DB touch.
|
||||||
last_db_write_ts = now_ts
|
# Cumulative DB load otherwise overwhelms the
|
||||||
await _recalculate_progress(job_id)
|
# asyncio loop → asyncssh drain falls behind →
|
||||||
_push_update()
|
# SSH window stops advancing → pipe fills →
|
||||||
|
# badblocks blocks on pipe_write → disk I/O stops.
|
||||||
|
now_ts = time.monotonic()
|
||||||
|
time_since_last_db = now_ts - last_db_write_ts
|
||||||
|
should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS
|
||||||
|
|
||||||
# Buffer non-progress lines for the next flush.
|
if should_write:
|
||||||
if not is_progress_line:
|
if await _is_cancelled(job_id):
|
||||||
pending_log_chunks.append(line)
|
|
||||||
|
|
||||||
# Abort on bad block threshold (immediate, not
|
|
||||||
# throttled — finding bad blocks is urgent).
|
|
||||||
if bad_blocks_total > settings.bad_block_threshold:
|
|
||||||
await kill.kill_remote_process(job_id)
|
await kill.kill_remote_process(job_id)
|
||||||
output_lines.append(
|
|
||||||
f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded "
|
|
||||||
f"threshold ({settings.bad_block_threshold})\n"
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
await asyncio.gather(
|
if phase_changed:
|
||||||
_drain(proc.stdout, False),
|
await _record_bb_phase_start(
|
||||||
_drain(proc.stderr, True),
|
job_id, "surface_validate",
|
||||||
return_exceptions=True,
|
progress.phase, _now(),
|
||||||
)
|
)
|
||||||
|
await _update_stage_percent(
|
||||||
|
job_id, "surface_validate", progress.overall_pct,
|
||||||
|
)
|
||||||
|
await _update_stage_bb_phase(
|
||||||
|
job_id, "surface_validate",
|
||||||
|
progress.phase, progress.phase_pct,
|
||||||
|
)
|
||||||
|
await _update_stage_bad_blocks(
|
||||||
|
job_id, "surface_validate", bad_blocks_total,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
drive_size_bytes
|
||||||
|
and not phase_changed
|
||||||
|
and progress.overall_pct > last_pct_sample
|
||||||
|
and time_since_last_db >= 1.0
|
||||||
|
):
|
||||||
|
d_pct = progress.overall_pct - last_pct_sample
|
||||||
|
bytes_done = (d_pct / 800.0) * drive_size_bytes
|
||||||
|
mbps = bytes_done / time_since_last_db / 1_000_000
|
||||||
|
await _update_stage_bb_mbps(
|
||||||
|
job_id, "surface_validate", mbps,
|
||||||
|
)
|
||||||
|
|
||||||
|
if pending_log_chunks:
|
||||||
|
chunk = "".join(pending_log_chunks)
|
||||||
|
pending_log_chunks.clear()
|
||||||
|
await _append_stage_log(
|
||||||
|
job_id, "surface_validate", chunk,
|
||||||
|
)
|
||||||
|
|
||||||
|
last_pct_sample = progress.overall_pct
|
||||||
|
last_db_write_ts = now_ts
|
||||||
|
await _recalculate_progress(job_id)
|
||||||
|
_push_update()
|
||||||
|
|
||||||
|
if not is_progress_line:
|
||||||
|
pending_log_chunks.append(line)
|
||||||
|
|
||||||
|
# Abort on bad block threshold — immediate.
|
||||||
|
if bad_blocks_total > settings.bad_block_threshold:
|
||||||
|
await kill.kill_remote_process(job_id)
|
||||||
|
output_lines.append(
|
||||||
|
f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded "
|
||||||
|
f"threshold ({settings.bad_block_threshold})\n"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Single stream now — the `2>&1` in _build_badblocks_cmd
|
||||||
|
# merges stderr into stdout before the `tr` pipe.
|
||||||
|
await _drain(proc.stdout, False)
|
||||||
# Bound proc.wait so a remote process that ignored our kill
|
# Bound proc.wait so a remote process that ignored our kill
|
||||||
# signal (or that we never managed to kill) can't pin this
|
# signal (or that we never managed to kill) can't pin this
|
||||||
# task in the semaphore forever. Closing the connection on
|
# task in the semaphore forever. Closing the connection on
|
||||||
|
|
|
||||||
|
|
@ -86,7 +86,7 @@ class Settings(BaseSettings):
|
||||||
ssh_key: str = "" # PEM private key content (paste full key including headers)
|
ssh_key: str = "" # PEM private key content (paste full key including headers)
|
||||||
|
|
||||||
# Application version — used by the /api/v1/updates/check endpoint
|
# Application version — used by the /api/v1/updates/check endpoint
|
||||||
app_version: str = "1.0.0-56"
|
app_version: str = "1.0.0-57"
|
||||||
|
|
||||||
# ---- Authentication (1.0.0-22) ----
|
# ---- Authentication (1.0.0-22) ----
|
||||||
# session_secret: HMAC key for signing session cookies. Empty = generate
|
# session_secret: HMAC key for signing session cookies. Empty = generate
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue