[api] Don't tear down log connection on stack-trace decode failure (#16196)

This commit is contained in:
J. Nick Koston
2026-05-03 20:05:13 -05:00
committed by GitHub
parent 120d1e51fb
commit 9ddb828da3
3 changed files with 163 additions and 11 deletions

View File

@@ -18,7 +18,7 @@ with warnings.catch_warnings():
import contextlib
from esphome.const import CONF_KEY, CONF_PORT, __version__
from esphome.core import CORE
from esphome.core import CORE, EsphomeError
from esphome.platformio_api import process_stacktrace
from . import CONF_ENCRYPTION
@@ -32,6 +32,52 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
class _LogLineProcessor:
"""Feeds incoming log lines to the stack-trace decoder.
Two responsibilities beyond just calling the decoder:
1. Catch EsphomeError. on_log runs inside an asyncio protocol
callback; if an exception escapes, the loop tears the transport
down with "Fatal error: protocol.data_received() call failed."
and ReconnectLogic immediately reconnects, the device replays
the same crash trace, and we loop forever.
2. Disable decoding after the first failure. _decode_pc shells out
to PlatformIO via _run_idedata, which is expensive; a single
crash dump can contain many PC/BT lines and we don't want to
retry the failing subprocess for each one.
"""
def __init__(self, config: dict[str, Any], platform_handler: Any | None) -> None:
self._config = config
self._platform_handler = platform_handler
self._decode_enabled = True
self.backtrace_state = False
def process_line(self, raw_line: str) -> None:
if not self._decode_enabled:
return
try:
if self._platform_handler is not None:
self.backtrace_state = self._platform_handler(
self._config, raw_line, self.backtrace_state
)
else:
self.backtrace_state = process_stacktrace(
self._config, raw_line, backtrace_state=self.backtrace_state
)
except EsphomeError as exc:
self._decode_enabled = False
self.backtrace_state = False
# _run_idedata raises EsphomeError with no message; fall back
# to a generic explanation when str(exc) is empty.
detail = str(exc) or "build artifacts not found locally"
_LOGGER.warning(
"Crash trace decoding unavailable: %s. "
"Run 'esphome compile' for this device to enable PC decoding.",
detail,
)
async def async_run_logs(
config: dict[str, Any],
addresses: list[str],
@@ -61,7 +107,6 @@ async def async_run_logs(
addresses=addresses, # Pass all addresses for automatic retry
)
dashboard = CORE.dashboard
backtrace_state = False
# Try platform-specific stacktrace handler first, fall back to generic
platform_process_stacktrace = None
@@ -71,9 +116,10 @@ async def async_run_logs(
except (AttributeError, ImportError):
pass
processor = _LogLineProcessor(config, platform_process_stacktrace)
def on_log(msg: SubscribeLogsResponse) -> None:
"""Handle a new log message."""
nonlocal backtrace_state
time_ = datetime.now()
message: bytes = msg.message
text = message.decode("utf8", "backslashreplace")
@@ -84,14 +130,7 @@ async def async_run_logs(
for parsed_msg in parse_log_message(text, timestamp):
print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg)
for raw_line in text.splitlines():
if platform_process_stacktrace:
backtrace_state = platform_process_stacktrace(
config, raw_line, backtrace_state
)
else:
backtrace_state = process_stacktrace(
config, raw_line, backtrace_state=backtrace_state
)
processor.process_line(raw_line)
# Safe to fall back to plaintext here only for this diagnostics use
# case: the stream is one-way from device to client, and this code

View File

@@ -0,0 +1,113 @@
"""Tests for esphome.components.api.client."""
from __future__ import annotations
from unittest.mock import patch
from esphome.components.api import client as api_client
from esphome.core import EsphomeError
def test_decoder_swallows_esphome_error() -> None:
"""A failing stack-trace decode must not propagate.
on_log runs inside an asyncio protocol callback; if EsphomeError
escapes, the loop reports "Fatal error: protocol.data_received()
call failed.", tears the connection down, and ReconnectLogic loops
forever as the device replays the same crash trace on every
reconnect.
"""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(
api_client, "process_stacktrace", side_effect=EsphomeError("no idedata")
) as mock_process:
processor.process_line("PC: 0x4010496e")
assert mock_process.called
assert processor.backtrace_state is False
def test_decoder_swallows_platform_handler_error() -> None:
"""The same protection must apply to the platform-specific handler."""
config = {"esphome": {"name": "test"}}
def platform_handler(_config, _line, _state):
raise EsphomeError("no idedata")
processor = api_client._LogLineProcessor(config, platform_handler)
processor.process_line("PC: 0x4010496e")
assert processor.backtrace_state is False
def test_decoder_warning_uses_fallback_for_empty_error(caplog) -> None:
"""_run_idedata raises EsphomeError with no message; the warning
must show a useful explanation rather than empty parens.
"""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(api_client, "process_stacktrace", side_effect=EsphomeError()):
processor.process_line("PC: 0x4010496e")
warnings = [r.message for r in caplog.records if r.levelname == "WARNING"]
assert any("build artifacts not found locally" in m for m in warnings)
assert not any("()" in m for m in warnings)
def test_decoder_short_circuits_after_failure() -> None:
"""After one failure, subsequent lines must not retry the decoder.
_decode_pc shells out to PlatformIO; a crash dump can contain many
PC/BT lines and retrying the failing subprocess for each one would
stall log streaming.
"""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(
api_client, "process_stacktrace", side_effect=EsphomeError("no idedata")
) as mock_process:
processor.process_line("PC: 0x4010496e")
processor.process_line("BT0: 0x4010496e")
processor.process_line("BT1: 0x401049aa")
assert mock_process.call_count == 1
def test_decoder_threads_backtrace_state() -> None:
"""When decoding succeeds, backtrace_state is threaded across calls."""
config = {"esphome": {"name": "test"}}
processor = api_client._LogLineProcessor(config, None)
with patch.object(
api_client, "process_stacktrace", side_effect=[True, False]
) as mock_process:
processor.process_line(">>>stack>>>")
assert processor.backtrace_state is True
processor.process_line("<<<stack<<<")
assert processor.backtrace_state is False
assert mock_process.call_args_list[0].kwargs == {"backtrace_state": False}
assert mock_process.call_args_list[1].kwargs == {"backtrace_state": True}
def test_decoder_uses_platform_handler_when_provided() -> None:
"""The platform handler is preferred over the generic one."""
config = {"esphome": {"name": "test"}}
calls: list[tuple[object, str, bool]] = []
def platform_handler(cfg, line, state):
calls.append((cfg, line, state))
return True
processor = api_client._LogLineProcessor(config, platform_handler)
with patch.object(api_client, "process_stacktrace") as mock_generic:
processor.process_line("BT0: 0x4010496e")
assert calls == [(config, "BT0: 0x4010496e", False)]
assert mock_generic.called is False
assert processor.backtrace_state is True