diff --git a/esphome/components/api/client.py b/esphome/components/api/client.py index 312d937f01..d5214ccbf6 100644 --- a/esphome/components/api/client.py +++ b/esphome/components/api/client.py @@ -18,7 +18,7 @@ with warnings.catch_warnings(): import contextlib from esphome.const import CONF_KEY, CONF_PORT, __version__ -from esphome.core import CORE +from esphome.core import CORE, EsphomeError from esphome.platformio_api import process_stacktrace from . import CONF_ENCRYPTION @@ -32,6 +32,52 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) +class _LogLineProcessor: + """Feeds incoming log lines to the stack-trace decoder. + + Two responsibilities beyond just calling the decoder: + 1. Catch EsphomeError. on_log runs inside an asyncio protocol + callback; if an exception escapes, the loop tears the transport + down with "Fatal error: protocol.data_received() call failed." + and ReconnectLogic immediately reconnects, the device replays + the same crash trace, and we loop forever. + 2. Disable decoding after the first failure. _decode_pc shells out + to PlatformIO via _run_idedata, which is expensive; a single + crash dump can contain many PC/BT lines and we don't want to + retry the failing subprocess for each one. + """ + + def __init__(self, config: dict[str, Any], platform_handler: Any | None) -> None: + self._config = config + self._platform_handler = platform_handler + self._decode_enabled = True + self.backtrace_state = False + + def process_line(self, raw_line: str) -> None: + if not self._decode_enabled: + return + try: + if self._platform_handler is not None: + self.backtrace_state = self._platform_handler( + self._config, raw_line, self.backtrace_state + ) + else: + self.backtrace_state = process_stacktrace( + self._config, raw_line, backtrace_state=self.backtrace_state + ) + except EsphomeError as exc: + self._decode_enabled = False + self.backtrace_state = False + # _run_idedata raises EsphomeError with no message; fall back + # to a generic explanation when str(exc) is empty. + detail = str(exc) or "build artifacts not found locally" + _LOGGER.warning( + "Crash trace decoding unavailable: %s. " + "Run 'esphome compile' for this device to enable PC decoding.", + detail, + ) + + async def async_run_logs( config: dict[str, Any], addresses: list[str], @@ -61,7 +107,6 @@ async def async_run_logs( addresses=addresses, # Pass all addresses for automatic retry ) dashboard = CORE.dashboard - backtrace_state = False # Try platform-specific stacktrace handler first, fall back to generic platform_process_stacktrace = None @@ -71,9 +116,10 @@ async def async_run_logs( except (AttributeError, ImportError): pass + processor = _LogLineProcessor(config, platform_process_stacktrace) + def on_log(msg: SubscribeLogsResponse) -> None: """Handle a new log message.""" - nonlocal backtrace_state time_ = datetime.now() message: bytes = msg.message text = message.decode("utf8", "backslashreplace") @@ -84,14 +130,7 @@ async def async_run_logs( for parsed_msg in parse_log_message(text, timestamp): print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg) for raw_line in text.splitlines(): - if platform_process_stacktrace: - backtrace_state = platform_process_stacktrace( - config, raw_line, backtrace_state - ) - else: - backtrace_state = process_stacktrace( - config, raw_line, backtrace_state=backtrace_state - ) + processor.process_line(raw_line) # Safe to fall back to plaintext here only for this diagnostics use # case: the stream is one-way from device to client, and this code diff --git a/tests/unit_tests/components/api/__init__.py b/tests/unit_tests/components/api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit_tests/components/api/test_client.py b/tests/unit_tests/components/api/test_client.py new file mode 100644 index 0000000000..3970d1ce8b --- /dev/null +++ b/tests/unit_tests/components/api/test_client.py @@ -0,0 +1,113 @@ +"""Tests for esphome.components.api.client.""" + +from __future__ import annotations + +from unittest.mock import patch + +from esphome.components.api import client as api_client +from esphome.core import EsphomeError + + +def test_decoder_swallows_esphome_error() -> None: + """A failing stack-trace decode must not propagate. + + on_log runs inside an asyncio protocol callback; if EsphomeError + escapes, the loop reports "Fatal error: protocol.data_received() + call failed.", tears the connection down, and ReconnectLogic loops + forever as the device replays the same crash trace on every + reconnect. + """ + config = {"esphome": {"name": "test"}} + processor = api_client._LogLineProcessor(config, None) + + with patch.object( + api_client, "process_stacktrace", side_effect=EsphomeError("no idedata") + ) as mock_process: + processor.process_line("PC: 0x4010496e") + + assert mock_process.called + assert processor.backtrace_state is False + + +def test_decoder_swallows_platform_handler_error() -> None: + """The same protection must apply to the platform-specific handler.""" + config = {"esphome": {"name": "test"}} + + def platform_handler(_config, _line, _state): + raise EsphomeError("no idedata") + + processor = api_client._LogLineProcessor(config, platform_handler) + processor.process_line("PC: 0x4010496e") + + assert processor.backtrace_state is False + + +def test_decoder_warning_uses_fallback_for_empty_error(caplog) -> None: + """_run_idedata raises EsphomeError with no message; the warning + must show a useful explanation rather than empty parens. + """ + config = {"esphome": {"name": "test"}} + processor = api_client._LogLineProcessor(config, None) + + with patch.object(api_client, "process_stacktrace", side_effect=EsphomeError()): + processor.process_line("PC: 0x4010496e") + + warnings = [r.message for r in caplog.records if r.levelname == "WARNING"] + assert any("build artifacts not found locally" in m for m in warnings) + assert not any("()" in m for m in warnings) + + +def test_decoder_short_circuits_after_failure() -> None: + """After one failure, subsequent lines must not retry the decoder. + + _decode_pc shells out to PlatformIO; a crash dump can contain many + PC/BT lines and retrying the failing subprocess for each one would + stall log streaming. + """ + config = {"esphome": {"name": "test"}} + processor = api_client._LogLineProcessor(config, None) + + with patch.object( + api_client, "process_stacktrace", side_effect=EsphomeError("no idedata") + ) as mock_process: + processor.process_line("PC: 0x4010496e") + processor.process_line("BT0: 0x4010496e") + processor.process_line("BT1: 0x401049aa") + + assert mock_process.call_count == 1 + + +def test_decoder_threads_backtrace_state() -> None: + """When decoding succeeds, backtrace_state is threaded across calls.""" + config = {"esphome": {"name": "test"}} + processor = api_client._LogLineProcessor(config, None) + + with patch.object( + api_client, "process_stacktrace", side_effect=[True, False] + ) as mock_process: + processor.process_line(">>>stack>>>") + assert processor.backtrace_state is True + processor.process_line("<< None: + """The platform handler is preferred over the generic one.""" + config = {"esphome": {"name": "test"}} + calls: list[tuple[object, str, bool]] = [] + + def platform_handler(cfg, line, state): + calls.append((cfg, line, state)) + return True + + processor = api_client._LogLineProcessor(config, platform_handler) + + with patch.object(api_client, "process_stacktrace") as mock_generic: + processor.process_line("BT0: 0x4010496e") + + assert calls == [(config, "BT0: 0x4010496e", False)] + assert mock_generic.called is False + assert processor.backtrace_state is True