diff --git a/tests/integration/test_logger_buffered_recursion_guard.py b/tests/integration/test_logger_buffered_recursion_guard.py index 152e204214..5bef915b28 100644 --- a/tests/integration/test_logger_buffered_recursion_guard.py +++ b/tests/integration/test_logger_buffered_recursion_guard.py @@ -45,17 +45,11 @@ async def test_logger_buffered_recursion_guard( api_client_connected: APIClientConnectedFactory, ) -> None: """Buffered (non-main-thread) log messages survive a re-entrant main-task log.""" - console_lines: list[str] = [] api_messages: list[str] = [] - test_complete_event = asyncio.Event() - - def line_callback(line: str) -> None: - console_lines.append(line) - if "RACE_TEST_COMPLETE" in line: - test_complete_event.set() + all_drained = asyncio.Event() async with ( - run_compiled(yaml_config, line_callback=line_callback), + run_compiled(yaml_config), api_client_connected() as client, ): device_info = await client.device_info() @@ -65,8 +59,17 @@ async def test_logger_buffered_recursion_guard( # Subscribe over the API: this is the exact path that crashed in the field # (the API log callback runs during the buffered drain). The API message field # preserves embedded newlines, so it reliably exposes a clobbered buffer. + # + # Every buffered thread message is delivered here whether it survives intact or + # gets clobbered (a clobbered message still carries its THREADMSG payload), so + # counting THREADMSG occurrences is a deterministic "drain complete" signal: no + # arbitrary sleep, no dependence on the fix being present. def on_log(msg) -> None: - api_messages.append(msg.message.decode("utf-8", errors="replace")) + text = msg.message.decode("utf-8", errors="replace") + api_messages.append(text) + received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages) + if received >= NUM_MESSAGES: + all_drained.set() client.subscribe_logs(on_log, log_level=LogLevel.LOG_LEVEL_VERY_VERBOSE) @@ -75,19 +78,16 @@ async def test_logger_buffered_recursion_guard( assert buttons, "Could not find Start Race Test button" client.button_command(buttons[0].key) - # RACE_TEST_COMPLETE is logged from the main task, so it arrives even if the - # buffered path is corrupted. The buffered messages are drained afterwards. + # Wait until every buffered thread message has been delivered over the API. try: - await asyncio.wait_for(test_complete_event.wait(), timeout=30.0) + await asyncio.wait_for(all_drained.wait(), timeout=30.0) except TimeoutError: + received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages) pytest.fail( - "Test did not complete within timeout; device likely crashed or hung. " - f"Collected {len(console_lines)} console lines." + f"Only {received}/{NUM_MESSAGES} thread messages arrived before timeout; " + "device likely crashed or hung." ) - # Allow the buffered messages to drain after the thread joined. - await asyncio.sleep(0.5) - intact: set[int] = set() contaminated: list[str] = [] for raw in api_messages: