fix: backport stages.py \b-parser fix + drawer-finish inline (uncommitted from 1.0.0-55)
The chunk-read parser fix that ships as part of 1.0.0-55 in the running container was scp'd to maple but never reached git. Same for the drawer-job-finish margin-left removal (request: pill sits inline next to operator/date, not flush right). Reconciling source with deployed state. No new behaviour — git now matches what's been live on maple since 1.0.0-55.
This commit is contained in:
parent
71eac9cba0
commit
f71ae341f5
2 changed files with 133 additions and 116 deletions
|
|
@ -543,128 +543,146 @@ async def _stage_surface_validate_ssh(job_id: int, devname: str, drive_id: int)
|
|||
|
||||
async def _drain(stream, is_stderr: bool):
|
||||
nonlocal bad_blocks_total, pid_seen
|
||||
async for raw in stream:
|
||||
line = raw if isinstance(raw, str) else raw.decode("utf-8", errors="replace")
|
||||
# Chunk-read instead of line-iterate. badblocks emits
|
||||
# progress with '\b' backspaces (and sometimes '\r')
|
||||
# to overwrite the previous progress line in-place —
|
||||
# there's no '\n' between updates until a phase
|
||||
# transition. async-for-line would buffer the entire
|
||||
# phase's output as ONE line, so the parser never
|
||||
# sees mid-phase percent updates. We read raw chunks,
|
||||
# normalize \b runs + \r to \n, then process each
|
||||
# resulting fragment as a line. Keep the partial
|
||||
# trailing fragment in buf for the next chunk.
|
||||
buf = ""
|
||||
while True:
|
||||
chunk = await stream.read(4096)
|
||||
if not chunk:
|
||||
break
|
||||
if isinstance(chunk, bytes):
|
||||
chunk = chunk.decode("utf-8", errors="replace")
|
||||
buf += chunk
|
||||
# Normalize \b runs + lone \r to \n so split() works.
|
||||
normalized = _re_pre.sub(r"[\b]+", "\n", buf)
|
||||
normalized = normalized.replace("\r", "\n")
|
||||
fragments = normalized.split("\n")
|
||||
# The tail is incomplete — keep it for the next chunk.
|
||||
buf = fragments[-1]
|
||||
for fragment in fragments[:-1]:
|
||||
line = fragment + "\n"
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
# First stdout line is "PID:<n>" from the wrapping shell.
|
||||
# Capture it and don't append it to the user-visible log.
|
||||
if not is_stderr and not pid_seen and line.startswith("PID:"):
|
||||
pid_seen = True
|
||||
try:
|
||||
kill.set_remote_pid(job_id, int(line[4:].strip()))
|
||||
log.info(
|
||||
"Captured remote PID %d for job %d (badblocks)",
|
||||
kill.get_remote_pid(job_id), job_id,
|
||||
# First stdout line is "PID:<n>" from the
|
||||
# wrapping shell. Capture and skip.
|
||||
if not is_stderr and not pid_seen and line.startswith("PID:"):
|
||||
pid_seen = True
|
||||
try:
|
||||
kill.set_remote_pid(job_id, int(line[4:].strip()))
|
||||
log.info(
|
||||
"Captured remote PID %d for job %d (badblocks)",
|
||||
kill.get_remote_pid(job_id), job_id,
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Drive progress.update from EVERY stderr line
|
||||
# so it picks up "Testing with pattern 0xXX" +
|
||||
# "Reading and comparing" headers, not just
|
||||
# the percent-done lines. CPU-bound regex.
|
||||
prev_phase = progress.phase
|
||||
phase_changed = False
|
||||
is_progress_line = False
|
||||
if is_stderr:
|
||||
progress.update(line)
|
||||
phase_changed = progress.phase != prev_phase
|
||||
is_progress_line = bool(_BB_PERCENT_RE.search(line))
|
||||
else:
|
||||
stripped = line.strip()
|
||||
if stripped and stripped.isdigit():
|
||||
bad_blocks_total += 1
|
||||
|
||||
# Keep the "XX% done" lines OUT of output_lines.
|
||||
# They're the dominant volume; log_text concat
|
||||
# is quadratic.
|
||||
if not is_progress_line:
|
||||
output_lines.append(line)
|
||||
|
||||
# Single throttle gate covering EVERY DB touch.
|
||||
# Without this, the cumulative DB load makes
|
||||
# the asyncssh drain fall behind, the SSH
|
||||
# window stops advancing, sshd stops reading
|
||||
# the pipe, badblocks blocks on pipe_write
|
||||
# and no disk I/O happens (sectors_written
|
||||
# delta of 0 confirmed the symptom).
|
||||
now_ts = time.monotonic()
|
||||
time_since_last_db = now_ts - last_db_write_ts
|
||||
should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS
|
||||
|
||||
if should_write:
|
||||
# 1) Cancellation check (was per-line)
|
||||
if await _is_cancelled(job_id):
|
||||
await kill.kill_remote_process(job_id)
|
||||
return
|
||||
|
||||
# 2) Per-pattern progress + history
|
||||
if phase_changed:
|
||||
await _record_bb_phase_start(
|
||||
job_id, "surface_validate",
|
||||
progress.phase, _now(),
|
||||
)
|
||||
await _update_stage_percent(
|
||||
job_id, "surface_validate", progress.overall_pct,
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Drive progress.update from EVERY stderr line so it
|
||||
# picks up the "Testing with pattern 0xXX" + "Reading
|
||||
# and comparing" headers (which advance the phase
|
||||
# counter), not just the percent-done lines. CPU-
|
||||
# bound regex work, no I/O, safe to do unconditionally.
|
||||
prev_phase = progress.phase
|
||||
phase_changed = False
|
||||
is_progress_line = False
|
||||
if is_stderr:
|
||||
progress.update(line)
|
||||
phase_changed = progress.phase != prev_phase
|
||||
is_progress_line = bool(_BB_PERCENT_RE.search(line))
|
||||
else:
|
||||
stripped = line.strip()
|
||||
if stripped and stripped.isdigit():
|
||||
bad_blocks_total += 1
|
||||
|
||||
# Keep the "XX% done" lines OUT of output_lines.
|
||||
# They're the dominant volume (sub-second cadence
|
||||
# for hours) and the log_text concat is quadratic
|
||||
# in length. Headers, errors, bad-block numbers,
|
||||
# and other diagnostic output still get logged.
|
||||
if not is_progress_line:
|
||||
output_lines.append(line)
|
||||
|
||||
# Single throttle gate covering EVERY DB touch in
|
||||
# this loop. Without this, _is_cancelled + percent
|
||||
# writes + chunk-log appends collectively overwhelm
|
||||
# the asyncio loop, the asyncssh drain falls behind,
|
||||
# the SSH window stops advancing, sshd stops reading
|
||||
# the pipe, badblocks blocks on pipe_write and no
|
||||
# disk I/O happens. Symptom: badblocks pid in
|
||||
# wchan=pipe_write with /sys/block sectors_written
|
||||
# delta of 0.
|
||||
now_ts = time.monotonic()
|
||||
time_since_last_db = now_ts - last_db_write_ts
|
||||
should_write = phase_changed or time_since_last_db >= BB_DB_MIN_SECONDS
|
||||
|
||||
if should_write:
|
||||
# 1) Cancellation check (was per-line)
|
||||
if await _is_cancelled(job_id):
|
||||
await kill.kill_remote_process(job_id)
|
||||
return
|
||||
|
||||
# 2) Per-pattern progress + history
|
||||
if phase_changed:
|
||||
await _record_bb_phase_start(
|
||||
await _update_stage_bb_phase(
|
||||
job_id, "surface_validate",
|
||||
progress.phase, _now(),
|
||||
progress.phase, progress.phase_pct,
|
||||
)
|
||||
await _update_stage_percent(
|
||||
job_id, "surface_validate", progress.overall_pct,
|
||||
)
|
||||
await _update_stage_bb_phase(
|
||||
job_id, "surface_validate",
|
||||
progress.phase, progress.phase_pct,
|
||||
)
|
||||
await _update_stage_bad_blocks(
|
||||
job_id, "surface_validate", bad_blocks_total,
|
||||
)
|
||||
|
||||
# 3) Throughput. Skip phase transitions (per-
|
||||
# phase pct resets would yield a negative delta).
|
||||
if (
|
||||
drive_size_bytes
|
||||
and not phase_changed
|
||||
and progress.overall_pct > last_pct_sample
|
||||
and time_since_last_db >= 1.0
|
||||
):
|
||||
d_pct = progress.overall_pct - last_pct_sample
|
||||
bytes_done = (d_pct / 800.0) * drive_size_bytes
|
||||
mbps = bytes_done / time_since_last_db / 1_000_000
|
||||
await _update_stage_bb_mbps(
|
||||
job_id, "surface_validate", mbps,
|
||||
await _update_stage_bad_blocks(
|
||||
job_id, "surface_validate", bad_blocks_total,
|
||||
)
|
||||
|
||||
# 4) Log flush. Chunk what's accumulated since
|
||||
# the last write. log_text concat is quadratic
|
||||
# so the volume reduction from skipping "% done"
|
||||
# lines above matters MORE than the throttle.
|
||||
if pending_log_chunks:
|
||||
chunk = "".join(pending_log_chunks)
|
||||
pending_log_chunks.clear()
|
||||
await _append_stage_log(
|
||||
job_id, "surface_validate", chunk,
|
||||
# 3) Throughput. Skip phase transitions
|
||||
# (per-phase pct resets → negative delta).
|
||||
if (
|
||||
drive_size_bytes
|
||||
and not phase_changed
|
||||
and progress.overall_pct > last_pct_sample
|
||||
and time_since_last_db >= 1.0
|
||||
):
|
||||
d_pct = progress.overall_pct - last_pct_sample
|
||||
bytes_done = (d_pct / 800.0) * drive_size_bytes
|
||||
mbps = bytes_done / time_since_last_db / 1_000_000
|
||||
await _update_stage_bb_mbps(
|
||||
job_id, "surface_validate", mbps,
|
||||
)
|
||||
|
||||
# 4) Log flush.
|
||||
if pending_log_chunks:
|
||||
chunk = "".join(pending_log_chunks)
|
||||
pending_log_chunks.clear()
|
||||
await _append_stage_log(
|
||||
job_id, "surface_validate", chunk,
|
||||
)
|
||||
|
||||
last_pct_sample = progress.overall_pct
|
||||
last_db_write_ts = now_ts
|
||||
await _recalculate_progress(job_id)
|
||||
_push_update()
|
||||
|
||||
# Buffer non-progress lines for the next flush.
|
||||
if not is_progress_line:
|
||||
pending_log_chunks.append(line)
|
||||
|
||||
# Abort on bad block threshold (immediate, not
|
||||
# throttled — finding bad blocks is urgent).
|
||||
if bad_blocks_total > settings.bad_block_threshold:
|
||||
await kill.kill_remote_process(job_id)
|
||||
output_lines.append(
|
||||
f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded "
|
||||
f"threshold ({settings.bad_block_threshold})\n"
|
||||
)
|
||||
|
||||
last_pct_sample = progress.overall_pct
|
||||
last_db_write_ts = now_ts
|
||||
await _recalculate_progress(job_id)
|
||||
_push_update()
|
||||
|
||||
# Buffer non-progress lines for the next flush.
|
||||
if not is_progress_line:
|
||||
pending_log_chunks.append(line)
|
||||
|
||||
# Abort on bad block threshold (immediate, not
|
||||
# throttled — finding bad blocks is urgent).
|
||||
if bad_blocks_total > settings.bad_block_threshold:
|
||||
await kill.kill_remote_process(job_id)
|
||||
output_lines.append(
|
||||
f"\n[ABORTED] {bad_blocks_total} bad block(s) exceeded "
|
||||
f"threshold ({settings.bad_block_threshold})\n"
|
||||
)
|
||||
return
|
||||
return
|
||||
|
||||
await asyncio.gather(
|
||||
_drain(proc.stdout, False),
|
||||
|
|
|
|||
|
|
@ -2951,7 +2951,6 @@ th.sort-desc::after {
|
|||
flex-wrap: wrap;
|
||||
}
|
||||
.drawer-job-finish {
|
||||
margin-left: auto;
|
||||
display: inline-flex;
|
||||
align-items: baseline;
|
||||
gap: 8px;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue