mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 09:57:43 +00:00
[logger] Hold recursion guard while draining the task log buffer (#17044)
This commit is contained in:
committed by
Jesse Hills
parent
d27229a1c7
commit
20cd6a1771
@@ -175,6 +175,10 @@ void Logger::process_messages_() {
|
||||
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
|
||||
// Process any buffered messages when available
|
||||
if (this->log_buffer_.has_messages()) {
|
||||
// Prevent main-task logs emitted by listener callbacks (e.g. the API send path) from re-entering
|
||||
// and corrupting the shared tx_buffer_ / API shared_write_buffer_ while we are draining here.
|
||||
// Mirrors the guard held by log_message_to_buffer_and_send_ on the synchronous logging path.
|
||||
RecursionGuard guard(this->main_task_recursion_guard_);
|
||||
logger::TaskLogBuffer::LogMessage *message;
|
||||
uint16_t text_length;
|
||||
while (this->log_buffer_.borrow_message_main_loop(message, text_length)) {
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
esphome:
|
||||
name: logger-recursion-test
|
||||
host:
|
||||
api:
|
||||
logger:
|
||||
level: DEBUG
|
||||
on_message:
|
||||
# Fires on the main loop for every message delivered to listeners, including
|
||||
# messages drained from the task log buffer (i.e. logged from a non-main thread).
|
||||
# The lambda logs again on the main task. Without a recursion guard on the buffered
|
||||
# drain path this re-entrant log reuses the shared tx_buffer_ and clobbers the
|
||||
# buffered message that is still being delivered, corrupting its console output.
|
||||
- level: VERY_VERBOSE
|
||||
then:
|
||||
- lambda: |-
|
||||
ESP_LOGD("reentry", "REENTRANT_CLOBBER_MARKER");
|
||||
|
||||
button:
|
||||
- platform: template
|
||||
name: "Start Race Test"
|
||||
id: start_test_button
|
||||
on_press:
|
||||
- lambda: |-
|
||||
// Keep the count well under the host task-log-buffer slot count so every
|
||||
// message goes through the ring buffer (buffered drain path) instead of the
|
||||
// emergency console fallback. The main loop is blocked in pthread_join while
|
||||
// the thread logs, so all messages are drained together once it returns.
|
||||
static const int NUM_MESSAGES = 30;
|
||||
|
||||
struct ThreadTest {
|
||||
static void *thread_func(void *arg) {
|
||||
char thread_name[16];
|
||||
snprintf(thread_name, sizeof(thread_name), "LogThread");
|
||||
#ifdef __APPLE__
|
||||
pthread_setname_np(thread_name);
|
||||
#else
|
||||
pthread_setname_np(pthread_self(), thread_name);
|
||||
#endif
|
||||
|
||||
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||
// Verifiable payload: data is a deterministic function of the message
|
||||
// index, so a clobbered buffer shows up as a missing or mismatched line.
|
||||
ESP_LOGD("thread_test", "THREADMSG%03d_DATA_%08X", i, i * 12345);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
// RACE_TEST_START / RACE_TEST_COMPLETE are logged from the main task (the
|
||||
// synchronous path, which already holds the recursion guard) so the test can
|
||||
// always detect completion even when the buffered path is corrupted.
|
||||
ESP_LOGI("thread_test", "RACE_TEST_START: logging %d messages from a thread", NUM_MESSAGES);
|
||||
|
||||
pthread_t thread;
|
||||
if (pthread_create(&thread, nullptr, ThreadTest::thread_func, nullptr) != 0) {
|
||||
ESP_LOGE("thread_test", "RACE_TEST_ERROR: Failed to create thread");
|
||||
return;
|
||||
}
|
||||
pthread_join(thread, nullptr);
|
||||
|
||||
ESP_LOGI("thread_test", "RACE_TEST_COMPLETE: thread finished, expected %d messages", NUM_MESSAGES);
|
||||
119
tests/integration/test_logger_buffered_recursion_guard.py
Normal file
119
tests/integration/test_logger_buffered_recursion_guard.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Integration test for the recursion guard on the buffered logger drain path.
|
||||
|
||||
Regression test for a crash where a log message drained from the task log buffer
|
||||
(i.e. logged from a non-main thread) re-entered the logger on the main task while it
|
||||
was still being delivered to listeners. The buffered drain in
|
||||
``Logger::process_messages_`` did not hold the main-task recursion guard that the
|
||||
synchronous logging path holds, so a listener callback that logged again on the main
|
||||
task (e.g. the API log-forwarding path, or a ``logger.on_message`` automation) reused
|
||||
the shared ``tx_buffer_`` and clobbered the message mid-delivery. On ESP32 this showed
|
||||
up as a ``StoreProhibited`` panic inside the API send path.
|
||||
|
||||
The fixture logs a small batch of verifiable messages from a non-main thread (kept
|
||||
under the host task-log-buffer slot count so they all take the buffered drain path
|
||||
rather than the emergency console fallback) while an ``on_message`` automation re-logs
|
||||
``REENTRANT_CLOBBER_MARKER`` on the main task for every delivered message.
|
||||
|
||||
Without the guard the re-entrant marker is written into the shared ``tx_buffer_`` while
|
||||
the buffered thread message is still being delivered, so the message the API receives is
|
||||
contaminated (it contains the marker and an embedded newline glued onto the thread
|
||||
payload). With the guard the re-entrant log is dropped during the drain, the marker
|
||||
never appears, and every thread message is delivered clean.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
from aioesphomeapi import LogLevel
|
||||
import pytest
|
||||
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
_ANSI = re.compile(r"\x1b\[[0-9;]*m")
|
||||
# THREADMSGnnn_DATA_xxxxxxxx where data is a deterministic checksum of the index
|
||||
THREAD_MSG_PATTERN = re.compile(r"THREADMSG(\d{3})_DATA_([0-9A-F]{8})")
|
||||
|
||||
NUM_MESSAGES = 30
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_logger_buffered_recursion_guard(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Buffered (non-main-thread) log messages survive a re-entrant main-task log."""
|
||||
api_messages: list[str] = []
|
||||
all_drained = asyncio.Event()
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
device_info = await client.device_info()
|
||||
assert device_info is not None
|
||||
assert device_info.name == "logger-recursion-test"
|
||||
|
||||
# Subscribe over the API: this is the exact path that crashed in the field
|
||||
# (the API log callback runs during the buffered drain). The API message field
|
||||
# preserves embedded newlines, so it reliably exposes a clobbered buffer.
|
||||
#
|
||||
# Every buffered thread message is delivered here whether it survives intact or
|
||||
# gets clobbered (a clobbered message still carries its THREADMSG payload), so
|
||||
# counting THREADMSG occurrences is a deterministic "drain complete" signal: no
|
||||
# arbitrary sleep, no dependence on the fix being present.
|
||||
def on_log(msg) -> None:
|
||||
text = msg.message.decode("utf-8", errors="replace")
|
||||
api_messages.append(text)
|
||||
received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages)
|
||||
if received >= NUM_MESSAGES:
|
||||
all_drained.set()
|
||||
|
||||
client.subscribe_logs(on_log, log_level=LogLevel.LOG_LEVEL_VERY_VERBOSE)
|
||||
|
||||
entities, _ = await client.list_entities_services()
|
||||
buttons = [e for e in entities if e.name == "Start Race Test"]
|
||||
assert buttons, "Could not find Start Race Test button"
|
||||
client.button_command(buttons[0].key)
|
||||
|
||||
# Wait until every buffered thread message has been delivered over the API.
|
||||
try:
|
||||
await asyncio.wait_for(all_drained.wait(), timeout=30.0)
|
||||
except TimeoutError:
|
||||
received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages)
|
||||
pytest.fail(
|
||||
f"Only {received}/{NUM_MESSAGES} thread messages arrived before timeout; "
|
||||
"device likely crashed or hung."
|
||||
)
|
||||
|
||||
intact: set[int] = set()
|
||||
contaminated: list[str] = []
|
||||
for raw in api_messages:
|
||||
text = _ANSI.sub("", raw)
|
||||
if "THREADMSG" not in text:
|
||||
continue
|
||||
# A clean thread message is a single line carrying only its own payload. A
|
||||
# clobbered buffer glues the re-entrant marker (and an embedded newline) onto it.
|
||||
if "REENTRANT" in text or "\n" in text:
|
||||
contaminated.append(repr(raw))
|
||||
continue
|
||||
match = THREAD_MSG_PATTERN.search(text)
|
||||
assert match, f"Unexpected thread message format: {raw!r}"
|
||||
msg_num = int(match.group(1))
|
||||
expected = f"{msg_num * 12345:08X}"
|
||||
if match.group(2) != expected:
|
||||
contaminated.append(repr(raw))
|
||||
continue
|
||||
intact.add(msg_num)
|
||||
|
||||
assert not contaminated, (
|
||||
"Buffered thread messages were clobbered by a re-entrant main-task log "
|
||||
"(missing recursion guard on the buffered drain path):\n"
|
||||
+ "\n".join(contaminated[:10])
|
||||
)
|
||||
assert len(intact) == NUM_MESSAGES, (
|
||||
f"Expected {NUM_MESSAGES} intact buffered thread messages over the API, got "
|
||||
f"{len(intact)}. Missing ids: {sorted(set(range(NUM_MESSAGES)) - intact)}"
|
||||
)
|
||||
Reference in New Issue
Block a user