From b97182d302ef98da48fc26eb10149bf3d5b1b853 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 18 Jun 2026 16:16:14 -0500 Subject: [PATCH] [logger] Hold recursion guard while draining the task log buffer (#17044) --- esphome/components/logger/logger.cpp | 4 + .../logger_buffered_recursion_guard.yaml | 61 +++++++++ .../test_logger_buffered_recursion_guard.py | 119 ++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 tests/integration/fixtures/logger_buffered_recursion_guard.yaml create mode 100644 tests/integration/test_logger_buffered_recursion_guard.py diff --git a/esphome/components/logger/logger.cpp b/esphome/components/logger/logger.cpp index a035525101..684da0202e 100644 --- a/esphome/components/logger/logger.cpp +++ b/esphome/components/logger/logger.cpp @@ -175,6 +175,10 @@ void Logger::process_messages_() { #ifdef USE_ESPHOME_TASK_LOG_BUFFER // Process any buffered messages when available if (this->log_buffer_.has_messages()) { + // Prevent main-task logs emitted by listener callbacks (e.g. the API send path) from re-entering + // and corrupting the shared tx_buffer_ / API shared_write_buffer_ while we are draining here. + // Mirrors the guard held by log_message_to_buffer_and_send_ on the synchronous logging path. + RecursionGuard guard(this->main_task_recursion_guard_); logger::TaskLogBuffer::LogMessage *message; uint16_t text_length; while (this->log_buffer_.borrow_message_main_loop(message, text_length)) { diff --git a/tests/integration/fixtures/logger_buffered_recursion_guard.yaml b/tests/integration/fixtures/logger_buffered_recursion_guard.yaml new file mode 100644 index 0000000000..058adbff99 --- /dev/null +++ b/tests/integration/fixtures/logger_buffered_recursion_guard.yaml @@ -0,0 +1,61 @@ +esphome: + name: logger-recursion-test +host: +api: +logger: + level: DEBUG + on_message: + # Fires on the main loop for every message delivered to listeners, including + # messages drained from the task log buffer (i.e. logged from a non-main thread). + # The lambda logs again on the main task. Without a recursion guard on the buffered + # drain path this re-entrant log reuses the shared tx_buffer_ and clobbers the + # buffered message that is still being delivered, corrupting its console output. + - level: VERY_VERBOSE + then: + - lambda: |- + ESP_LOGD("reentry", "REENTRANT_CLOBBER_MARKER"); + +button: + - platform: template + name: "Start Race Test" + id: start_test_button + on_press: + - lambda: |- + // Keep the count well under the host task-log-buffer slot count so every + // message goes through the ring buffer (buffered drain path) instead of the + // emergency console fallback. The main loop is blocked in pthread_join while + // the thread logs, so all messages are drained together once it returns. + static const int NUM_MESSAGES = 30; + + struct ThreadTest { + static void *thread_func(void *arg) { + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "LogThread"); + #ifdef __APPLE__ + pthread_setname_np(thread_name); + #else + pthread_setname_np(pthread_self(), thread_name); + #endif + + for (int i = 0; i < NUM_MESSAGES; i++) { + // Verifiable payload: data is a deterministic function of the message + // index, so a clobbered buffer shows up as a missing or mismatched line. + ESP_LOGD("thread_test", "THREADMSG%03d_DATA_%08X", i, i * 12345); + } + return nullptr; + } + }; + + // RACE_TEST_START / RACE_TEST_COMPLETE are logged from the main task (the + // synchronous path, which already holds the recursion guard) so the test can + // always detect completion even when the buffered path is corrupted. + ESP_LOGI("thread_test", "RACE_TEST_START: logging %d messages from a thread", NUM_MESSAGES); + + pthread_t thread; + if (pthread_create(&thread, nullptr, ThreadTest::thread_func, nullptr) != 0) { + ESP_LOGE("thread_test", "RACE_TEST_ERROR: Failed to create thread"); + return; + } + pthread_join(thread, nullptr); + + ESP_LOGI("thread_test", "RACE_TEST_COMPLETE: thread finished, expected %d messages", NUM_MESSAGES); diff --git a/tests/integration/test_logger_buffered_recursion_guard.py b/tests/integration/test_logger_buffered_recursion_guard.py new file mode 100644 index 0000000000..5bef915b28 --- /dev/null +++ b/tests/integration/test_logger_buffered_recursion_guard.py @@ -0,0 +1,119 @@ +"""Integration test for the recursion guard on the buffered logger drain path. + +Regression test for a crash where a log message drained from the task log buffer +(i.e. logged from a non-main thread) re-entered the logger on the main task while it +was still being delivered to listeners. The buffered drain in +``Logger::process_messages_`` did not hold the main-task recursion guard that the +synchronous logging path holds, so a listener callback that logged again on the main +task (e.g. the API log-forwarding path, or a ``logger.on_message`` automation) reused +the shared ``tx_buffer_`` and clobbered the message mid-delivery. On ESP32 this showed +up as a ``StoreProhibited`` panic inside the API send path. + +The fixture logs a small batch of verifiable messages from a non-main thread (kept +under the host task-log-buffer slot count so they all take the buffered drain path +rather than the emergency console fallback) while an ``on_message`` automation re-logs +``REENTRANT_CLOBBER_MARKER`` on the main task for every delivered message. + +Without the guard the re-entrant marker is written into the shared ``tx_buffer_`` while +the buffered thread message is still being delivered, so the message the API receives is +contaminated (it contains the marker and an embedded newline glued onto the thread +payload). With the guard the re-entrant log is dropped during the drain, the marker +never appears, and every thread message is delivered clean. +""" + +from __future__ import annotations + +import asyncio +import re + +from aioesphomeapi import LogLevel +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + +_ANSI = re.compile(r"\x1b\[[0-9;]*m") +# THREADMSGnnn_DATA_xxxxxxxx where data is a deterministic checksum of the index +THREAD_MSG_PATTERN = re.compile(r"THREADMSG(\d{3})_DATA_([0-9A-F]{8})") + +NUM_MESSAGES = 30 + + +@pytest.mark.asyncio +async def test_logger_buffered_recursion_guard( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Buffered (non-main-thread) log messages survive a re-entrant main-task log.""" + api_messages: list[str] = [] + all_drained = asyncio.Event() + + async with ( + run_compiled(yaml_config), + api_client_connected() as client, + ): + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "logger-recursion-test" + + # Subscribe over the API: this is the exact path that crashed in the field + # (the API log callback runs during the buffered drain). The API message field + # preserves embedded newlines, so it reliably exposes a clobbered buffer. + # + # Every buffered thread message is delivered here whether it survives intact or + # gets clobbered (a clobbered message still carries its THREADMSG payload), so + # counting THREADMSG occurrences is a deterministic "drain complete" signal: no + # arbitrary sleep, no dependence on the fix being present. + def on_log(msg) -> None: + text = msg.message.decode("utf-8", errors="replace") + api_messages.append(text) + received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages) + if received >= NUM_MESSAGES: + all_drained.set() + + client.subscribe_logs(on_log, log_level=LogLevel.LOG_LEVEL_VERY_VERBOSE) + + entities, _ = await client.list_entities_services() + buttons = [e for e in entities if e.name == "Start Race Test"] + assert buttons, "Could not find Start Race Test button" + client.button_command(buttons[0].key) + + # Wait until every buffered thread message has been delivered over the API. + try: + await asyncio.wait_for(all_drained.wait(), timeout=30.0) + except TimeoutError: + received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages) + pytest.fail( + f"Only {received}/{NUM_MESSAGES} thread messages arrived before timeout; " + "device likely crashed or hung." + ) + + intact: set[int] = set() + contaminated: list[str] = [] + for raw in api_messages: + text = _ANSI.sub("", raw) + if "THREADMSG" not in text: + continue + # A clean thread message is a single line carrying only its own payload. A + # clobbered buffer glues the re-entrant marker (and an embedded newline) onto it. + if "REENTRANT" in text or "\n" in text: + contaminated.append(repr(raw)) + continue + match = THREAD_MSG_PATTERN.search(text) + assert match, f"Unexpected thread message format: {raw!r}" + msg_num = int(match.group(1)) + expected = f"{msg_num * 12345:08X}" + if match.group(2) != expected: + contaminated.append(repr(raw)) + continue + intact.add(msg_num) + + assert not contaminated, ( + "Buffered thread messages were clobbered by a re-entrant main-task log " + "(missing recursion guard on the buffered drain path):\n" + + "\n".join(contaminated[:10]) + ) + assert len(intact) == NUM_MESSAGES, ( + f"Expected {NUM_MESSAGES} intact buffered thread messages over the API, got " + f"{len(intact)}. Missing ids: {sorted(set(range(NUM_MESSAGES)) - intact)}" + )