diff --git a/esphome/__main__.py b/esphome/__main__.py index 680de02201..416c1160b7 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -28,13 +28,13 @@ from esphome.const import ( ALLOWED_NAME_CHARS, ARGUMENT_HELP_DEVICE, CONF_API, - CONF_AUTH, CONF_BAUD_RATE, CONF_BROKER, CONF_DEASSERT_RTS_DTR, CONF_DISABLED, CONF_ESPHOME, CONF_LEVEL, + CONF_LOG, CONF_LOG_TOPIC, CONF_LOGGER, CONF_MDNS, @@ -48,7 +48,7 @@ from esphome.const import ( CONF_PORT, CONF_SUBSTITUTIONS, CONF_TOPIC, - CONF_USERNAME, + CONF_VERSION, CONF_WEB_SERVER, CONF_WIFI, ENV_NOGITIGNORE, @@ -280,8 +280,8 @@ def _unresolved_default_error(purpose: Purpose, defaults: list[str]) -> str: if purpose == Purpose.LOGGING and not has_api(): return ( "Cannot view logs over the network: no 'api:' component is " - "configured. Network log streaming requires the native API; add " - "an 'api:' component, enable MQTT logging, or view logs over USB." + "configured. Add an 'api:' component, enable MQTT logging, add a " + "'web_server:' component, or view logs over USB." ) if purpose == Purpose.UPLOADING and not has_ota(): return ( @@ -321,9 +321,12 @@ def choose_upload_log_host( ] resolved.append(choose_prompt(options, purpose=purpose)) elif device == "OTA": + # Logs can stream over a network transport via the native API + # or the web_server HTTP SSE feed. + network_logging = has_api() or has_web_server_logging() # ensure IP adresses are used first if is_ip_address(CORE.address) and ( - (purpose == Purpose.LOGGING and has_api()) + (purpose == Purpose.LOGGING and network_logging) or (purpose == Purpose.UPLOADING and has_ota()) ): resolved.extend(_resolve_with_cache(CORE.address, purpose)) @@ -335,7 +338,11 @@ def choose_upload_log_host( if has_mqtt_logging(): resolved.append("MQTT") - if has_api() and has_non_ip_address() and has_resolvable_address(): + if ( + network_logging + and has_non_ip_address() + and has_resolvable_address() + ): resolved.extend(_ota_hostnames_for_default(purpose)) elif purpose == Purpose.UPLOADING: @@ -397,7 +404,7 @@ def choose_upload_log_host( mqtt_config = CORE.config[CONF_MQTT] options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT")) - if has_api(): + if has_api() or has_web_server_logging(): add_ota_options() elif purpose == Purpose.UPLOADING and has_ota(): @@ -490,6 +497,21 @@ def has_web_server_ota() -> bool: ) +def has_web_server_logging() -> bool: + """Check if logs can be streamed over the web_server HTTP SSE endpoint. + + The ``web_server`` component exposes a ``/events`` Server-Sent Events + stream that carries ``event: log`` frames. This requires version 2+ (the + v1 UI has no ``/events`` endpoint) and the ``log`` option enabled (default). + """ + web_conf = CORE.config.get(CONF_WEB_SERVER) + if web_conf is None: + return False + if web_conf.get(CONF_VERSION, 2) == 1: + return False + return web_conf.get(CONF_LOG, True) + + def has_mqtt_ip_lookup() -> bool: """Check if MQTT is available and IP lookup is supported.""" from esphome.components.mqtt import CONF_DISCOVER_IP @@ -1290,25 +1312,23 @@ def _upload_via_native_api( def _upload_via_web_server( config: ConfigType, network_devices: list[str], binary: Path ) -> tuple[int, str | None]: - web_conf = config.get(CONF_WEB_SERVER) - if not web_conf: - raise EsphomeError( - f"Cannot upload via web_server OTA: the {CONF_WEB_SERVER} component " - f"is not configured." - ) - - remote_port = int(web_conf[CONF_PORT]) - auth = web_conf.get(CONF_AUTH) or {} - username = auth.get(CONF_USERNAME) - password = auth.get(CONF_PASSWORD) - from esphome import web_server_ota + from esphome.web_server_helpers import get_web_server_connection + remote_port, username, password = get_web_server_connection(config) return web_server_ota.run_ota( network_devices, remote_port, username, password, binary ) +def _show_logs_via_web_server(config: ConfigType, network_devices: list[str]) -> int: + from esphome import web_server_logs + from esphome.web_server_helpers import get_web_server_connection + + port, username, password = get_web_server_connection(config) + return web_server_logs.run_logs(network_devices, port, username, password) + + # Layout of esp_partition_info_t on flash. Each entry is 32 bytes, leading with a # 16-bit little-endian magic. ESP-IDF defines ESP_PARTITION_MAGIC = 0x50AA (stored as # bytes 0xAA, 0x50) for partition entries and ESP_PARTITION_MAGIC_MD5 = 0xEBEB for the @@ -1437,6 +1457,13 @@ def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int config, args.topic, args.username, args.password, args.client_id ) + # Fall back to the web_server HTTP SSE log stream for devices that have + # web_server: but no api: (the logging counterpart to web_server OTA). + if has_web_server_logging() and ( + network_devices := _resolve_network_devices(devices, config, args) + ): + return _show_logs_via_web_server(config, network_devices) + raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)") diff --git a/esphome/helpers.py b/esphome/helpers.py index ef7e2d0b93..d5be1d607b 100644 --- a/esphome/helpers.py +++ b/esphome/helpers.py @@ -328,6 +328,24 @@ def resolve_ip_address( return res +def format_ip_url(family: int, sockaddr: tuple, port: int, path: str) -> str: + """Build an ``http://host:port/path`` URL for a resolved address. + + ``family``/``sockaddr`` come from a :func:`resolve_ip_address` entry. IPv6 + literals must be wrapped in brackets in URLs; link-local addresses need a + percent-encoded zone index per RFC 6874. + """ + import socket + + ip = sockaddr[0] + if family == socket.AF_INET6: + scope = sockaddr[3] if len(sockaddr) >= 4 else 0 + host_part = f"[{ip}%25{scope}]" if scope else f"[{ip}]" + else: + host_part = ip + return f"http://{host_part}:{port}{path}" + + def sort_ip_addresses(address_list: list[str]) -> list[str]: """Takes a list of IP addresses in string form, e.g. from mDNS or MQTT, and sorts them into the best order to actually try connecting to them. diff --git a/esphome/web_server_helpers.py b/esphome/web_server_helpers.py new file mode 100644 index 0000000000..f48934b185 --- /dev/null +++ b/esphome/web_server_helpers.py @@ -0,0 +1,43 @@ +"""Shared helpers for the web_server HTTP transports (OTA upload and logs).""" + +from __future__ import annotations + +from esphome.const import ( + CONF_AUTH, + CONF_PASSWORD, + CONF_PORT, + CONF_USERNAME, + CONF_WEB_SERVER, +) +from esphome.core import CORE, EsphomeError +from esphome.helpers import format_ip_url, resolve_ip_address +from esphome.types import ConfigType + + +def resolve_web_server_urls(host: str, port: int, path: str) -> list[tuple[str, str]]: + """Resolve ``host`` to ``(ip, url)`` pairs for the web_server ``path``. + + Wraps :func:`resolve_ip_address` (honoring ``CORE.address_cache``) and + formats each resolved address into an ``http://host:port/path`` URL via + :func:`format_ip_url`, handling both IPv4 and IPv6. Shared by the + web_server OTA upload and log streaming paths. + """ + addr_infos = resolve_ip_address(host, port, address_cache=CORE.address_cache) + return [ + (sockaddr[0], format_ip_url(family, sockaddr, port, path)) + for family, _socktype, _, _, sockaddr in addr_infos + ] + + +def get_web_server_connection(config: ConfigType) -> tuple[int, str | None, str | None]: + """Return ``(port, username, password)`` for the web_server HTTP endpoint. + + Reads the port and optional HTTP Basic-auth credentials from the validated + ``web_server:`` config, shared by the web_server OTA upload and log + streaming paths. Raises :class:`EsphomeError` if ``web_server`` is absent. + """ + web_conf = config.get(CONF_WEB_SERVER) + if not web_conf: + raise EsphomeError(f"The {CONF_WEB_SERVER} component is not configured.") + auth = web_conf.get(CONF_AUTH) or {} + return int(web_conf[CONF_PORT]), auth.get(CONF_USERNAME), auth.get(CONF_PASSWORD) diff --git a/esphome/web_server_logs.py b/esphome/web_server_logs.py new file mode 100644 index 0000000000..e091e24bb7 --- /dev/null +++ b/esphome/web_server_logs.py @@ -0,0 +1,189 @@ +"""Stream device logs over the ``web_server`` component's HTTP SSE endpoint. + +The ``web_server`` component exposes a Server-Sent Events stream at ``/events`` +that multiplexes entity state, keepalive pings, and log lines (``event: log``). +This is the logging counterpart to the web_server OTA upload path +(:mod:`esphome.web_server_ota`); it lets ``esphome logs`` reach a device that +has ``web_server:`` configured but no ``api:``. + +Only the ``event: log`` frames are rendered; the payload is the device's +already-formatted, ANSI-colored log line, so it is passed through the same +``LogParser`` + ``safe_print`` path the serial and native-API log viewers use. +The stream is long-lived and the server drops idle connections, so the reader +reconnects automatically until interrupted. +""" + +from __future__ import annotations + +from datetime import datetime +import logging +import time +from typing import TYPE_CHECKING + +import requests +from requests.auth import HTTPBasicAuth + +from esphome.core import EsphomeError +from esphome.util import safe_print +from esphome.web_server_helpers import resolve_web_server_urls + +if TYPE_CHECKING: + from aioesphomeapi import LogParser + +_LOGGER = logging.getLogger(__name__) + +EVENTS_PATH = "/events" +# (connect_timeout, read_timeout). The device sends a keepalive ``ping`` every +# 10s, so a 30s read timeout tolerates a few missed pings before we treat the +# connection as dead and reconnect. +TIMEOUT = (10.0, 30.0) +# Pause between reconnect attempts so a downed device doesn't spin the CPU. +RECONNECT_DELAY = 1.0 +# Upper bound for the exponential backoff applied to consecutive failures, so an +# unreachable host backs off instead of retrying (and logging) once a second. +MAX_RECONNECT_DELAY = 10.0 + + +class WebServerLogsError(EsphomeError): + """Raised when the web_server log stream cannot be used (e.g. bad auth).""" + + +def _build_urls(hosts: list[str], port: int) -> list[tuple[str, str]]: + """Resolve ``hosts`` to ``(ip, url)`` pairs for the ``/events`` endpoint.""" + urls: list[tuple[str, str]] = [] + seen: set[str] = set() + for host in hosts: + try: + resolved = resolve_web_server_urls(host, port, EVENTS_PATH) + except EsphomeError as err: + _LOGGER.warning("Error resolving IP address of %s: %s", host, err) + continue + for ip, url in resolved: + if url not in seen: + seen.add(url) + urls.append((ip, url)) + return urls + + +def _emit(data_lines: list[str], parser: LogParser) -> None: + """Render the accumulated ``data:`` lines of one ``event: log`` frame.""" + time_ = datetime.now().astimezone() + milliseconds = time_.microsecond // 1000 + time_str = ( + f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]" + ) + for line in data_lines: + safe_print(parser.parse_line(line, time_str)) + + +def _consume(response: requests.Response, parser: LogParser) -> None: + """Parse the SSE stream, rendering only ``event: log`` frames. + + Implements the minimal slice of the SSE grammar the ``web_server`` stream + uses: ``field: value`` lines (with one optional leading space after the + colon) accumulated until a blank line dispatches the frame. ``id:``, + ``retry:``, and comment (``:``) lines are ignored, as are non-``log`` + events (``ping``, ``state``, ...). + """ + event_type = "message" + data_lines: list[str] = [] + # Iterate bytes and decode as UTF-8 ourselves (matching run_miniterm); the + # text/event-stream response has no charset, so requests' decode_unicode + # would fall back to Latin-1 and mojibake UTF-8 log characters. + for raw in response.iter_lines(): + line = raw.decode("utf8", "backslashreplace") + if not line: + if event_type == "log" and data_lines: + _emit(data_lines, parser) + event_type = "message" + data_lines = [] + continue + if line.startswith(":"): + continue + field, _, value = line.partition(":") + value = value.removeprefix(" ") + if field == "event": + event_type = value + elif field == "data": + data_lines.append(value) + + +def _stream(url: str, ip: str, auth: HTTPBasicAuth | None, parser: LogParser) -> bool: + """Connect and stream one session. + + Returns ``True`` if a connection was established (even if it later + dropped), ``False`` if the connection attempt itself failed so the caller + can try the next resolved address. + """ + connected = False + _LOGGER.info("Connecting to %s ...", url) + try: + with requests.get( + url, + stream=True, + auth=auth, + timeout=TIMEOUT, + headers={"Accept": "text/event-stream"}, + ) as response: + if response.status_code == 401: + raise WebServerLogsError( + "Authentication failed (HTTP 401). Check the 'web_server' " + "'auth' username and password." + ) + if response.status_code in (403, 404): + # Permanent: the endpoint won't appear on retry (wrong version, + # 'log' disabled, or forbidden). Surface it instead of looping. + raise WebServerLogsError( + f"Device returned HTTP {response.status_code} for " + f"{EVENTS_PATH}; the web_server log stream is unavailable. " + "Ensure 'web_server' is version 2 or higher with 'log' enabled." + ) + if response.status_code != 200: + _LOGGER.error( + "Unexpected HTTP %s response from %s", response.status_code, ip + ) + return False + connected = True + _LOGGER.info("Connected to %s", ip) + _consume(response, parser) + except requests.RequestException as err: + if connected: + _LOGGER.info("Log stream from %s ended (%s); reconnecting...", ip, err) + else: + _LOGGER.warning("Could not connect to %s: %s", ip, err) + return connected + + +def run_logs( + hosts: list[str], + port: int, + username: str | None, + password: str | None, +) -> int: + """Stream logs from the first reachable host over the web_server SSE feed. + + Reconnects automatically when the stream drops and returns ``0`` on + ``KeyboardInterrupt`` (Ctrl+C), mirroring how the serial log viewer exits. + """ + from aioesphomeapi import LogParser + + auth = HTTPBasicAuth(username, password) if username and password else None + parser = LogParser() + delay = RECONNECT_DELAY + try: + while True: + if not (urls := _build_urls(hosts, port)): + _LOGGER.error("Could not resolve any of: %s", ", ".join(hosts)) + connected = False + else: + # ``any`` stops at the first address that connects; when that + # stream drops we reconnect to the same set on the next pass. + connected = any(_stream(url, ip, auth, parser) for ip, url in urls) + # Reset the backoff once we reach the device; otherwise grow it + # (capped) so an unreachable host doesn't retry/log once a second. + delay = ( + RECONNECT_DELAY if connected else min(delay * 2, MAX_RECONNECT_DELAY) + ) + time.sleep(delay) + except KeyboardInterrupt: + return 0 diff --git a/esphome/web_server_ota.py b/esphome/web_server_ota.py index 8d0fdeecff..7b508e8527 100644 --- a/esphome/web_server_ota.py +++ b/esphome/web_server_ota.py @@ -12,14 +12,14 @@ import io import logging from pathlib import Path import secrets -import socket from typing import BinaryIO import requests from requests.auth import HTTPBasicAuth from esphome.core import EsphomeError -from esphome.helpers import ProgressBar, resolve_ip_address +from esphome.helpers import ProgressBar +from esphome.web_server_helpers import resolve_web_server_urls _LOGGER = logging.getLogger(__name__) @@ -95,7 +95,7 @@ def _try_upload( from esphome.core import CORE try: - addr_infos = resolve_ip_address(host, port, address_cache=CORE.address_cache) + addr_urls = resolve_web_server_urls(host, port, OTA_PATH) except EsphomeError as err: _LOGGER.error( "Error resolving IP address of %s. Is it connected to WiFi?", host @@ -104,7 +104,7 @@ def _try_upload( _LOGGER.error("(If you know the IP, try --device )") raise WebServerOTAError(err) from err - if not addr_infos: + if not addr_urls: _LOGGER.error("Could not resolve %s", host) return 1, None @@ -113,16 +113,7 @@ def _try_upload( auth = HTTPBasicAuth(username, password) if username and password else None # Iterate resolved IPs (IPv4 + IPv6 candidates) just like espota2 does. - for af, _socktype, _, _, sa in addr_infos: - ip = sa[0] - # IPv6 literals must be wrapped in brackets in URLs; link-local - # addresses need a percent-encoded zone index per RFC 6874. - if af == socket.AF_INET6: - scope = sa[3] if len(sa) >= 4 else 0 - host_part = f"[{ip}%25{scope}]" if scope else f"[{ip}]" - else: - host_part = ip - url = f"http://{host_part}:{port}{OTA_PATH}" + for ip, url in addr_urls: _LOGGER.info("Connecting to %s port %s...", ip, port) try: diff --git a/tests/unit_tests/test_helpers.py b/tests/unit_tests/test_helpers.py index 70c4b90082..84ec479e67 100644 --- a/tests/unit_tests/test_helpers.py +++ b/tests/unit_tests/test_helpers.py @@ -14,7 +14,7 @@ import pytest from esphome import helpers from esphome.address_cache import AddressCache from esphome.core import CORE, EsphomeError -from esphome.helpers import ProgressBar +from esphome.helpers import ProgressBar, format_ip_url @pytest.mark.parametrize( @@ -151,6 +151,22 @@ def test_is_ip_address__invalid(host): assert actual is False +@pytest.mark.parametrize( + ("family", "sockaddr", "expected"), + ( + (socket.AF_INET, ("192.168.1.5", 80), "http://192.168.1.5:80/events"), + (socket.AF_INET6, ("2001:db8::1", 80, 0, 0), "http://[2001:db8::1]:80/events"), + ( + socket.AF_INET6, + ("fe80::1", 8080, 0, 7), + "http://[fe80::1%257]:8080/events", + ), + ), +) +def test_format_ip_url(family, sockaddr, expected): + assert format_ip_url(family, sockaddr, sockaddr[1], "/events") == expected + + @settings(deadline=None) @given(value=ip_addresses(v=4).map(str)) def test_is_ip_address__valid(value): diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index bb06b6c930..784255a9c0 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -49,6 +49,7 @@ from esphome.__main__ import ( has_non_ip_address, has_ota, has_resolvable_address, + has_web_server_logging, has_web_server_ota, mqtt_get_ip, parse_args, @@ -72,6 +73,7 @@ from esphome.const import ( CONF_DISABLED, CONF_ESPHOME, CONF_LEVEL, + CONF_LOG, CONF_LOG_TOPIC, CONF_LOGGER, CONF_MDNS, @@ -86,6 +88,7 @@ from esphome.const import ( CONF_TOPIC, CONF_USE_ADDRESS, CONF_USERNAME, + CONF_VERSION, CONF_WEB_SERVER, CONF_WIFI, KEY_CORE, @@ -734,6 +737,30 @@ def test_choose_upload_log_host_with_ota_device_with_api_config_logging() -> Non assert result == ["192.168.1.100"] +def test_choose_upload_log_host_logging_web_server_only_ip() -> None: + """A web_server-only device with a static IP resolves to that IP for logs.""" + setup_core(config={CONF_WEB_SERVER: {}}, address="192.168.1.100") + + result = choose_upload_log_host( + default="OTA", + check_default=None, + purpose=Purpose.LOGGING, + ) + assert result == ["192.168.1.100"] + + +def test_choose_upload_log_host_logging_web_server_only_mdns() -> None: + """A web_server-only device with a .local name resolves to that hostname.""" + setup_core(config={CONF_WEB_SERVER: {}}, address="test.local") + + result = choose_upload_log_host( + default="OTA", + check_default=None, + purpose=Purpose.LOGGING, + ) + assert result == ["test.local"] + + def test_choose_upload_log_host_logging_without_api_reports_missing_api() -> None: """A resolvable device with only ota: fails logs with a missing-api message.""" setup_core( @@ -773,6 +800,17 @@ def test_unresolved_default_error_unresolvable_keeps_dashboard_hint() -> None: assert "set 'use_address'" in msg +def test_unresolved_default_error_logging_suggests_web_server() -> None: + """The missing-api log message lists web_server among the remediations.""" + setup_core( + config={CONF_OTA: [{CONF_PLATFORM: CONF_ESPHOME}]}, address="192.168.1.100" + ) + + msg = _unresolved_default_error(Purpose.LOGGING, ["OTA"]) + assert "no 'api:' component is configured" in msg + assert "'web_server:'" in msg + + def test_unresolved_default_error_upload_with_ota_is_generic() -> None: """With ota: present the upload error stays generic, not transport-specific.""" setup_core( @@ -2376,6 +2414,30 @@ def test_has_web_server_ota_returns_false_without_config() -> None: assert has_ota() is True +def test_has_web_server_logging_default() -> None: + """has_web_server_logging is True for a default web_server (v2, log on).""" + setup_core(config={CONF_WEB_SERVER: {}}) + assert has_web_server_logging() is True + + +def test_has_web_server_logging_without_config() -> None: + """has_web_server_logging is False when web_server is not configured.""" + setup_core(config={CONF_API: {}}) + assert has_web_server_logging() is False + + +def test_has_web_server_logging_v1_has_no_events_stream() -> None: + """has_web_server_logging is False for v1, which has no /events endpoint.""" + setup_core(config={CONF_WEB_SERVER: {CONF_VERSION: 1}}) + assert has_web_server_logging() is False + + +def test_has_web_server_logging_respects_log_disabled() -> None: + """has_web_server_logging is False when the web_server log option is off.""" + setup_core(config={CONF_WEB_SERVER: {CONF_LOG: False}}) + assert has_web_server_logging() is False + + def test_upload_program_web_server_only_auto_dispatches( mock_run_web_server_ota: Mock, mock_run_ota: Mock, @@ -2945,6 +3007,77 @@ def test_show_logs_network_with_mqtt_only( ) +@patch("esphome.web_server_logs.run_logs") +def test_show_logs_web_server( + mock_run_logs: Mock, +) -> None: + """A web_server-only device streams logs over the HTTP SSE endpoint.""" + setup_core( + config={ + "logger": {}, + CONF_WEB_SERVER: {CONF_PORT: 80}, + # No API or MQTT configured + }, + platform=PLATFORM_ESP32, + ) + mock_run_logs.return_value = 0 + + result = show_logs(CORE.config, MockArgs(), ["192.168.1.100"]) + + assert result == 0 + mock_run_logs.assert_called_once_with(["192.168.1.100"], 80, None, None) + + +@patch("esphome.web_server_logs.run_logs") +def test_show_logs_web_server_with_auth_and_port( + mock_run_logs: Mock, +) -> None: + """web_server port and basic-auth credentials are forwarded to the streamer.""" + setup_core( + config={ + "logger": {}, + CONF_WEB_SERVER: { + CONF_PORT: 8080, + CONF_AUTH: {CONF_USERNAME: "admin", CONF_PASSWORD: "secret"}, + }, + }, + platform=PLATFORM_ESP32, + ) + mock_run_logs.return_value = 0 + + result = show_logs(CORE.config, MockArgs(), ["192.168.1.100"]) + + assert result == 0 + mock_run_logs.assert_called_once_with(["192.168.1.100"], 8080, "admin", "secret") + + +@patch("esphome.web_server_logs.run_logs") +@patch("esphome.mqtt.show_logs") +def test_show_logs_mqtt_preferred_over_web_server( + mock_mqtt_show_logs: Mock, + mock_run_logs: Mock, +) -> None: + """With both MQTT logging and web_server, MQTT wins (API > MQTT > web_server).""" + setup_core( + config={ + "logger": {}, + "mqtt": {CONF_BROKER: "mqtt.local"}, + CONF_WEB_SERVER: {CONF_PORT: 80}, + }, + platform=PLATFORM_ESP32, + ) + mock_mqtt_show_logs.return_value = 0 + + args = MockArgs( + topic="esphome/logs", username="user", password="pass", client_id="client" + ) + result = show_logs(CORE.config, args, ["192.168.1.100"]) + + assert result == 0 + mock_mqtt_show_logs.assert_called_once() + mock_run_logs.assert_not_called() + + def test_show_logs_no_method_configured() -> None: """Test show_logs when no remote logging method is configured.""" setup_core( diff --git a/tests/unit_tests/test_web_server_helpers.py b/tests/unit_tests/test_web_server_helpers.py new file mode 100644 index 0000000000..0280630d69 --- /dev/null +++ b/tests/unit_tests/test_web_server_helpers.py @@ -0,0 +1,64 @@ +"""Unit tests for esphome.web_server_helpers module.""" + +from __future__ import annotations + +import socket + +import pytest + +from esphome.const import ( + CONF_AUTH, + CONF_PASSWORD, + CONF_PORT, + CONF_USERNAME, + CONF_WEB_SERVER, +) +from esphome.core import EsphomeError +from esphome.web_server_helpers import ( + get_web_server_connection, + resolve_web_server_urls, +) + + +def test_resolve_web_server_urls_maps_ipv4_and_ipv6( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Each resolved address becomes an (ip, url) pair with IPv6 bracketing.""" + addr_infos = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.5", 80)), + (socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("fe80::1", 80, 0, 7)), + ] + monkeypatch.setattr( + "esphome.web_server_helpers.resolve_ip_address", + lambda *args, **kwargs: addr_infos, + ) + + assert resolve_web_server_urls("dev.local", 80, "/events") == [ + ("192.168.1.5", "http://192.168.1.5:80/events"), + ("fe80::1", "http://[fe80::1%257]:80/events"), + ] + + +def test_get_web_server_connection_without_auth() -> None: + """Port is returned and credentials are None when no auth is configured.""" + config = {CONF_WEB_SERVER: {CONF_PORT: 80}} + + assert get_web_server_connection(config) == (80, None, None) + + +def test_get_web_server_connection_with_auth() -> None: + """Port and HTTP Basic credentials are returned when auth is configured.""" + config = { + CONF_WEB_SERVER: { + CONF_PORT: 8080, + CONF_AUTH: {CONF_USERNAME: "admin", CONF_PASSWORD: "secret"}, + } + } + + assert get_web_server_connection(config) == (8080, "admin", "secret") + + +def test_get_web_server_connection_missing_component() -> None: + """A config without web_server raises a clear error.""" + with pytest.raises(EsphomeError, match="web_server.*not configured"): + get_web_server_connection({}) diff --git a/tests/unit_tests/test_web_server_logs.py b/tests/unit_tests/test_web_server_logs.py new file mode 100644 index 0000000000..bbdf37bed7 --- /dev/null +++ b/tests/unit_tests/test_web_server_logs.py @@ -0,0 +1,397 @@ +"""Unit tests for esphome.web_server_logs module.""" + +from __future__ import annotations + +from collections.abc import Iterator +import logging +import socket +from typing import Self +from unittest.mock import MagicMock + +import pytest +import requests +from requests.auth import HTTPBasicAuth + +from esphome import web_server_logs +from esphome.core import EsphomeError +from esphome.web_server_logs import ( + EVENTS_PATH, + WebServerLogsError, + _build_urls, + _consume, + _stream, + run_logs, +) + +# A realistic slice of the web_server /events SSE stream: an initial ping +# carrying the config, a state frame, two log frames (one multi-line), plus +# comment/id/retry lines that must be ignored. +SSE_LINES = [ + "retry: 30000", + "id: 12345", + "event: ping", + 'data: {"title":"dev","log":true}', + "", + "event: state", + 'data: {"id":"sensor-x","state":"ON"}', + "", + "event: log", + "data: \x1b[0;32m[I][main:001]: hello\x1b[0m", + "", + ": keepalive-comment", + "event: log", + "data: line one", + "data: line two", + "", +] + + +class _FakeResponse: + """Minimal stand-in for a streamed ``requests`` response.""" + + def __init__(self, status_code: int, lines: list[str]) -> None: + self.status_code = status_code + self._lines = lines + + def __enter__(self) -> Self: + return self + + def __exit__(self, *exc: object) -> bool: + return False + + def iter_lines(self) -> Iterator[bytes]: + for line in self._lines: + yield line.encode("utf8") + + +@pytest.fixture +def fake_parser() -> MagicMock: + """A LogParser whose parse_line returns the raw line unchanged.""" + parser = MagicMock() + parser.parse_line.side_effect = lambda line, time_str: line + return parser + + +def _patch_resolve( + monkeypatch: pytest.MonkeyPatch, + addr_infos: list[tuple[int, int, int, str, tuple]], +) -> None: + monkeypatch.setattr( + "esphome.web_server_helpers.resolve_ip_address", + lambda *args, **kwargs: addr_infos, + ) + + +# --------------------------------------------------------------------------- +# _build_urls +# --------------------------------------------------------------------------- + + +def test_build_urls_ipv4(monkeypatch: pytest.MonkeyPatch) -> None: + """An IPv4 host resolves to a plain http://ip:port/events URL.""" + _patch_resolve( + monkeypatch, + [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.5", 80))], + ) + + assert _build_urls(["dev.local"], 80) == [ + ("192.168.1.5", f"http://192.168.1.5:80{EVENTS_PATH}") + ] + + +def test_build_urls_ipv6_brackets_and_zone(monkeypatch: pytest.MonkeyPatch) -> None: + """IPv6 literals are bracketed; link-local addresses get a %25 zone index.""" + _patch_resolve( + monkeypatch, + [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("fe80::1", 8080, 0, 7))], + ) + + assert _build_urls(["dev.local"], 8080) == [ + ("fe80::1", f"http://[fe80::1%257]:8080{EVENTS_PATH}") + ] + + +def test_build_urls_dedups_and_skips_unresolvable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Duplicate resolved IPs collapse to one URL; resolve errors are skipped.""" + calls: list[str] = [] + + def fake_resolve(host: str, port: int, **kwargs: object) -> list[tuple]: + calls.append(host) + if host == "bad": + raise EsphomeError("nope") + return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", port))] + + monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", fake_resolve) + + # "good" and "dup" both resolve to 10.0.0.1, "bad" raises. + assert _build_urls(["good", "bad", "dup"], 80) == [ + ("10.0.0.1", f"http://10.0.0.1:80{EVENTS_PATH}") + ] + assert calls == ["good", "bad", "dup"] + + +# --------------------------------------------------------------------------- +# _consume (SSE parsing) +# --------------------------------------------------------------------------- + + +def test_consume_emits_only_log_frames( + monkeypatch: pytest.MonkeyPatch, fake_parser: MagicMock +) -> None: + """Only event: log data lines are printed; ping/state/comments are ignored.""" + printed: list[str] = [] + monkeypatch.setattr(web_server_logs, "safe_print", printed.append) + + _consume(_FakeResponse(200, SSE_LINES), fake_parser) + + assert printed == [ + "\x1b[0;32m[I][main:001]: hello\x1b[0m", + "line one", + "line two", + ] + + +def test_consume_ignores_unterminated_trailing_frame( + monkeypatch: pytest.MonkeyPatch, fake_parser: MagicMock +) -> None: + """A log frame without its terminating blank line is not emitted.""" + printed: list[str] = [] + monkeypatch.setattr(web_server_logs, "safe_print", printed.append) + + _consume(_FakeResponse(200, ["event: log", "data: dangling"]), fake_parser) + + assert printed == [] + + +# --------------------------------------------------------------------------- +# _stream +# --------------------------------------------------------------------------- + + +def test_stream_returns_false_when_connect_fails( + monkeypatch: pytest.MonkeyPatch, + fake_parser: MagicMock, + caplog: pytest.LogCaptureFixture, +) -> None: + """A failed connection logs a warning and reports not-connected.""" + + def boom(*args: object, **kwargs: object) -> _FakeResponse: + raise requests.ConnectionError("refused") + + monkeypatch.setattr(requests, "get", boom) + + with caplog.at_level(logging.WARNING): + assert ( + _stream("http://10.0.0.1:80/events", "10.0.0.1", None, fake_parser) is False + ) + assert "Could not connect to 10.0.0.1" in caplog.text + + +def test_stream_returns_true_when_established_then_dropped( + monkeypatch: pytest.MonkeyPatch, + fake_parser: MagicMock, + caplog: pytest.LogCaptureFixture, +) -> None: + """A mid-stream drop after connecting reports connected so we reconnect.""" + printed: list[str] = [] + monkeypatch.setattr(web_server_logs, "safe_print", printed.append) + + class _DroppingResponse(_FakeResponse): + def iter_lines(self) -> Iterator[bytes]: + yield b"event: log" + yield b"data: before-drop" + yield b"" + raise requests.exceptions.ChunkedEncodingError("connection lost") + + monkeypatch.setattr(requests, "get", lambda *a, **kw: _DroppingResponse(200, [])) + + with caplog.at_level(logging.INFO): + assert ( + _stream("http://10.0.0.1:80/events", "10.0.0.1", None, fake_parser) is True + ) + assert printed == ["before-drop"] + assert "reconnecting" in caplog.text + + +# --------------------------------------------------------------------------- +# run_logs +# --------------------------------------------------------------------------- + + +def test_run_logs_streams_then_reconnects_until_interrupt( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A dropped stream reconnects; KeyboardInterrupt during the pause exits 0.""" + monkeypatch.setattr( + web_server_logs, + "_build_urls", + lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")], + ) + printed: list[str] = [] + monkeypatch.setattr(web_server_logs, "safe_print", printed.append) + monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(200, SSE_LINES)) + + def stop(_delay: float) -> None: + raise KeyboardInterrupt + + monkeypatch.setattr(web_server_logs.time, "sleep", stop) + + assert run_logs(["dev.local"], 80, None, None) == 0 + # The single stream was consumed before the reconnect pause interrupted us. + # run_logs renders through the real LogParser, which prefixes a timestamp, + # so assert on the payloads rather than exact equality. + assert len(printed) == 3 + assert "[I][main:001]: hello" in printed[0] + assert "line one" in printed[1] + assert "line two" in printed[2] + + +def test_run_logs_passes_basic_auth(monkeypatch: pytest.MonkeyPatch) -> None: + """Username + password are forwarded as HTTP Basic auth on the request.""" + monkeypatch.setattr( + web_server_logs, + "_build_urls", + lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")], + ) + monkeypatch.setattr(web_server_logs, "safe_print", lambda line: None) + captured: dict[str, object] = {} + + def fake_get(url: str, **kwargs: object) -> _FakeResponse: + captured.update(kwargs) + captured["url"] = url + return _FakeResponse(200, SSE_LINES) + + monkeypatch.setattr(requests, "get", fake_get) + monkeypatch.setattr( + web_server_logs.time, + "sleep", + lambda _d: (_ for _ in ()).throw(KeyboardInterrupt()), + ) + + assert run_logs(["dev.local"], 80, "admin", "secret") == 0 + auth = captured["auth"] + assert isinstance(auth, HTTPBasicAuth) + assert (auth.username, auth.password) == ("admin", "secret") + assert captured["stream"] is True + assert captured["headers"] == {"Accept": "text/event-stream"} + + +def test_run_logs_no_auth_when_credentials_missing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """No auth object is sent when username/password are not configured.""" + monkeypatch.setattr( + web_server_logs, + "_build_urls", + lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")], + ) + monkeypatch.setattr(web_server_logs, "safe_print", lambda line: None) + captured: dict[str, object] = {} + + def fake_get(url: str, **kwargs: object) -> _FakeResponse: + captured.update(kwargs) + return _FakeResponse(200, SSE_LINES) + + monkeypatch.setattr(requests, "get", fake_get) + monkeypatch.setattr( + web_server_logs.time, + "sleep", + lambda _d: (_ for _ in ()).throw(KeyboardInterrupt()), + ) + + assert run_logs(["dev.local"], 80, None, None) == 0 + assert captured["auth"] is None + + +def test_run_logs_raises_on_auth_failure(monkeypatch: pytest.MonkeyPatch) -> None: + """HTTP 401 aborts with a clear error rather than reconnecting forever.""" + monkeypatch.setattr( + web_server_logs, + "_build_urls", + lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")], + ) + monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(401, [])) + + with pytest.raises(WebServerLogsError, match="Authentication failed"): + run_logs(["dev.local"], 80, "admin", "bad") + + +def test_run_logs_retries_on_transient_status( + monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture +) -> None: + """A transient non-200 (e.g. 503) is logged and the loop retries.""" + monkeypatch.setattr( + web_server_logs, + "_build_urls", + lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")], + ) + monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(503, [])) + monkeypatch.setattr( + web_server_logs.time, + "sleep", + lambda _d: (_ for _ in ()).throw(KeyboardInterrupt()), + ) + + with caplog.at_level(logging.ERROR): + assert run_logs(["dev.local"], 80, None, None) == 0 + assert "Unexpected HTTP 503" in caplog.text + + +@pytest.mark.parametrize("status", (403, 404)) +def test_run_logs_raises_on_permanent_status( + monkeypatch: pytest.MonkeyPatch, status: int +) -> None: + """A permanent 403/404 aborts instead of retrying the endpoint forever.""" + monkeypatch.setattr( + web_server_logs, + "_build_urls", + lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")], + ) + monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(status, [])) + + with pytest.raises(WebServerLogsError, match=str(status)): + run_logs(["dev.local"], 80, None, None) + + +def test_run_logs_backs_off_on_repeated_failure( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Consecutive unreachable attempts grow the reconnect delay up to the cap.""" + monkeypatch.setattr(web_server_logs, "_build_urls", lambda hosts, port: []) + delays: list[float] = [] + + def record(delay: float) -> None: + delays.append(delay) + if len(delays) >= 4: + raise KeyboardInterrupt + + monkeypatch.setattr(web_server_logs.time, "sleep", record) + + assert run_logs(["dev.local"], 80, None, None) == 0 + # 1 -> 2 -> 4 -> 8 ... doubling, capped at MAX_RECONNECT_DELAY (10.0). + assert delays == [2.0, 4.0, 8.0, 10.0] + + +def test_run_logs_reports_unresolvable( + monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture +) -> None: + """When no host resolves, an error is logged and the loop pauses/retries.""" + monkeypatch.setattr(web_server_logs, "_build_urls", lambda hosts, port: []) + + # Let the first reconnect pause pass so the loop continues, then interrupt + # on the second so the retry path (the ``continue``) is exercised. + sleeps = {"n": 0} + + def sleep(_delay: float) -> None: + sleeps["n"] += 1 + if sleeps["n"] >= 2: + raise KeyboardInterrupt + + monkeypatch.setattr(web_server_logs.time, "sleep", sleep) + + with caplog.at_level(logging.ERROR): + assert run_logs(["dev.local"], 80, None, None) == 0 + assert sleeps["n"] == 2 + assert "Could not resolve" in caplog.text diff --git a/tests/unit_tests/test_web_server_ota.py b/tests/unit_tests/test_web_server_ota.py index 606905e36e..bde04f4db7 100644 --- a/tests/unit_tests/test_web_server_ota.py +++ b/tests/unit_tests/test_web_server_ota.py @@ -46,7 +46,7 @@ def _patch_resolve( for host, port in hosts ] monkeypatch.setattr( - "esphome.web_server_ota.resolve_ip_address", lambda *a, **kw: addr_infos + "esphome.web_server_helpers.resolve_ip_address", lambda *a, **kw: addr_infos ) @@ -475,7 +475,7 @@ def test_run_ota_resolution_failure( def _raise(*_args, **_kwargs): raise EsphomeError("dns failed") - monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _raise) + monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _raise) exit_code, host = run_ota(["does.not.exist"], 80, None, None, firmware) @@ -491,7 +491,7 @@ def test_run_ota_resolution_failure_dashboard_mode( def _raise(*_args, **_kwargs): raise EsphomeError("dns failed") - monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _raise) + monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _raise) monkeypatch.setattr(CORE, "dashboard", True) try: exit_code, host = run_ota(["does.not.exist"], 80, None, None, firmware) @@ -541,7 +541,7 @@ def test_run_ota_multiple_hosts_first_fails( def _resolve(host, port, address_cache=None): # noqa: ARG001 return addr_lookup[host] - monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _resolve) + monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _resolve) with patch( "esphome.web_server_ota.requests.post", @@ -570,7 +570,7 @@ def test_run_ota_all_hosts_return_failure_no_exception( def _resolve(host, port, address_cache=None): # noqa: ARG001 return addr_lookup[host] - monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _resolve) + monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _resolve) exit_code, host = run_ota(["a.local", "b.local"], 80, None, None, firmware) @@ -633,7 +633,7 @@ def test_run_ota_ipv6_url_brackets_host( (socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("2001:db8::1", 80, 0, 0)), ] monkeypatch.setattr( - "esphome.web_server_ota.resolve_ip_address", lambda *a, **kw: addr_infos + "esphome.web_server_helpers.resolve_ip_address", lambda *a, **kw: addr_infos ) with patch( @@ -656,7 +656,7 @@ def test_run_ota_ipv6_link_local_includes_scope_id( (socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("fe80::1", 80, 0, 3)), ] monkeypatch.setattr( - "esphome.web_server_ota.resolve_ip_address", lambda *a, **kw: addr_infos + "esphome.web_server_helpers.resolve_ip_address", lambda *a, **kw: addr_infos ) with patch(