[logger] Wait deterministically for buffered drain in test

Replace the fixed asyncio.sleep(0.5) with a wait on an asyncio.Event that
fires once all thread messages have arrived over the API. Every buffered
message is delivered whether it survives intact or gets clobbered, so counting
THREADMSG occurrences is a deterministic drain-complete signal with no arbitrary
sleep and no dependence on the fix being present.
This commit is contained in:
J. Nick Koston
2026-06-18 14:51:04 -05:00
parent c694e96f24
commit 0b37b9a202

View File

@@ -45,17 +45,11 @@ async def test_logger_buffered_recursion_guard(
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Buffered (non-main-thread) log messages survive a re-entrant main-task log."""
console_lines: list[str] = []
api_messages: list[str] = []
test_complete_event = asyncio.Event()
def line_callback(line: str) -> None:
console_lines.append(line)
if "RACE_TEST_COMPLETE" in line:
test_complete_event.set()
all_drained = asyncio.Event()
async with (
run_compiled(yaml_config, line_callback=line_callback),
run_compiled(yaml_config),
api_client_connected() as client,
):
device_info = await client.device_info()
@@ -65,8 +59,17 @@ async def test_logger_buffered_recursion_guard(
# Subscribe over the API: this is the exact path that crashed in the field
# (the API log callback runs during the buffered drain). The API message field
# preserves embedded newlines, so it reliably exposes a clobbered buffer.
#
# Every buffered thread message is delivered here whether it survives intact or
# gets clobbered (a clobbered message still carries its THREADMSG payload), so
# counting THREADMSG occurrences is a deterministic "drain complete" signal: no
# arbitrary sleep, no dependence on the fix being present.
def on_log(msg) -> None:
api_messages.append(msg.message.decode("utf-8", errors="replace"))
text = msg.message.decode("utf-8", errors="replace")
api_messages.append(text)
received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages)
if received >= NUM_MESSAGES:
all_drained.set()
client.subscribe_logs(on_log, log_level=LogLevel.LOG_LEVEL_VERY_VERBOSE)
@@ -75,19 +78,16 @@ async def test_logger_buffered_recursion_guard(
assert buttons, "Could not find Start Race Test button"
client.button_command(buttons[0].key)
# RACE_TEST_COMPLETE is logged from the main task, so it arrives even if the
# buffered path is corrupted. The buffered messages are drained afterwards.
# Wait until every buffered thread message has been delivered over the API.
try:
await asyncio.wait_for(test_complete_event.wait(), timeout=30.0)
await asyncio.wait_for(all_drained.wait(), timeout=30.0)
except TimeoutError:
received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages)
pytest.fail(
"Test did not complete within timeout; device likely crashed or hung. "
f"Collected {len(console_lines)} console lines."
f"Only {received}/{NUM_MESSAGES} thread messages arrived before timeout; "
"device likely crashed or hung."
)
# Allow the buffered messages to drain after the thread joined.
await asyncio.sleep(0.5)
intact: set[int] = set()
contaminated: list[str] = []
for raw in api_messages: