mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 12:53:26 +00:00
Merge remote-tracking branch 'origin/web-server-logs' into integration
This commit is contained in:
@@ -28,13 +28,13 @@ from esphome.const import (
|
||||
ALLOWED_NAME_CHARS,
|
||||
ARGUMENT_HELP_DEVICE,
|
||||
CONF_API,
|
||||
CONF_AUTH,
|
||||
CONF_BAUD_RATE,
|
||||
CONF_BROKER,
|
||||
CONF_DEASSERT_RTS_DTR,
|
||||
CONF_DISABLED,
|
||||
CONF_ESPHOME,
|
||||
CONF_LEVEL,
|
||||
CONF_LOG,
|
||||
CONF_LOG_TOPIC,
|
||||
CONF_LOGGER,
|
||||
CONF_MDNS,
|
||||
@@ -48,7 +48,7 @@ from esphome.const import (
|
||||
CONF_PORT,
|
||||
CONF_SUBSTITUTIONS,
|
||||
CONF_TOPIC,
|
||||
CONF_USERNAME,
|
||||
CONF_VERSION,
|
||||
CONF_WEB_SERVER,
|
||||
CONF_WIFI,
|
||||
ENV_NOGITIGNORE,
|
||||
@@ -280,8 +280,8 @@ def _unresolved_default_error(purpose: Purpose, defaults: list[str]) -> str:
|
||||
if purpose == Purpose.LOGGING and not has_api():
|
||||
return (
|
||||
"Cannot view logs over the network: no 'api:' component is "
|
||||
"configured. Network log streaming requires the native API; add "
|
||||
"an 'api:' component, enable MQTT logging, or view logs over USB."
|
||||
"configured. Add an 'api:' component, enable MQTT logging, add a "
|
||||
"'web_server:' component, or view logs over USB."
|
||||
)
|
||||
if purpose == Purpose.UPLOADING and not has_ota():
|
||||
return (
|
||||
@@ -321,9 +321,12 @@ def choose_upload_log_host(
|
||||
]
|
||||
resolved.append(choose_prompt(options, purpose=purpose))
|
||||
elif device == "OTA":
|
||||
# Logs can stream over a network transport via the native API
|
||||
# or the web_server HTTP SSE feed.
|
||||
network_logging = has_api() or has_web_server_logging()
|
||||
# ensure IP adresses are used first
|
||||
if is_ip_address(CORE.address) and (
|
||||
(purpose == Purpose.LOGGING and has_api())
|
||||
(purpose == Purpose.LOGGING and network_logging)
|
||||
or (purpose == Purpose.UPLOADING and has_ota())
|
||||
):
|
||||
resolved.extend(_resolve_with_cache(CORE.address, purpose))
|
||||
@@ -335,7 +338,11 @@ def choose_upload_log_host(
|
||||
if has_mqtt_logging():
|
||||
resolved.append("MQTT")
|
||||
|
||||
if has_api() and has_non_ip_address() and has_resolvable_address():
|
||||
if (
|
||||
network_logging
|
||||
and has_non_ip_address()
|
||||
and has_resolvable_address()
|
||||
):
|
||||
resolved.extend(_ota_hostnames_for_default(purpose))
|
||||
|
||||
elif purpose == Purpose.UPLOADING:
|
||||
@@ -397,7 +404,7 @@ def choose_upload_log_host(
|
||||
mqtt_config = CORE.config[CONF_MQTT]
|
||||
options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT"))
|
||||
|
||||
if has_api():
|
||||
if has_api() or has_web_server_logging():
|
||||
add_ota_options()
|
||||
|
||||
elif purpose == Purpose.UPLOADING and has_ota():
|
||||
@@ -490,6 +497,21 @@ def has_web_server_ota() -> bool:
|
||||
)
|
||||
|
||||
|
||||
def has_web_server_logging() -> bool:
|
||||
"""Check if logs can be streamed over the web_server HTTP SSE endpoint.
|
||||
|
||||
The ``web_server`` component exposes a ``/events`` Server-Sent Events
|
||||
stream that carries ``event: log`` frames. This requires version 2+ (the
|
||||
v1 UI has no ``/events`` endpoint) and the ``log`` option enabled (default).
|
||||
"""
|
||||
web_conf = CORE.config.get(CONF_WEB_SERVER)
|
||||
if web_conf is None:
|
||||
return False
|
||||
if web_conf.get(CONF_VERSION, 2) == 1:
|
||||
return False
|
||||
return web_conf.get(CONF_LOG, True)
|
||||
|
||||
|
||||
def has_mqtt_ip_lookup() -> bool:
|
||||
"""Check if MQTT is available and IP lookup is supported."""
|
||||
from esphome.components.mqtt import CONF_DISCOVER_IP
|
||||
@@ -1291,25 +1313,23 @@ def _upload_via_native_api(
|
||||
def _upload_via_web_server(
|
||||
config: ConfigType, network_devices: list[str], binary: Path
|
||||
) -> tuple[int, str | None]:
|
||||
web_conf = config.get(CONF_WEB_SERVER)
|
||||
if not web_conf:
|
||||
raise EsphomeError(
|
||||
f"Cannot upload via web_server OTA: the {CONF_WEB_SERVER} component "
|
||||
f"is not configured."
|
||||
)
|
||||
|
||||
remote_port = int(web_conf[CONF_PORT])
|
||||
auth = web_conf.get(CONF_AUTH) or {}
|
||||
username = auth.get(CONF_USERNAME)
|
||||
password = auth.get(CONF_PASSWORD)
|
||||
|
||||
from esphome import web_server_ota
|
||||
from esphome.web_server_helpers import get_web_server_connection
|
||||
|
||||
remote_port, username, password = get_web_server_connection(config)
|
||||
return web_server_ota.run_ota(
|
||||
network_devices, remote_port, username, password, binary
|
||||
)
|
||||
|
||||
|
||||
def _show_logs_via_web_server(config: ConfigType, network_devices: list[str]) -> int:
|
||||
from esphome import web_server_logs
|
||||
from esphome.web_server_helpers import get_web_server_connection
|
||||
|
||||
port, username, password = get_web_server_connection(config)
|
||||
return web_server_logs.run_logs(network_devices, port, username, password)
|
||||
|
||||
|
||||
# Layout of esp_partition_info_t on flash. Each entry is 32 bytes, leading with a
|
||||
# 16-bit little-endian magic. ESP-IDF defines ESP_PARTITION_MAGIC = 0x50AA (stored as
|
||||
# bytes 0xAA, 0x50) for partition entries and ESP_PARTITION_MAGIC_MD5 = 0xEBEB for the
|
||||
@@ -1438,6 +1458,13 @@ def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int
|
||||
config, args.topic, args.username, args.password, args.client_id
|
||||
)
|
||||
|
||||
# Fall back to the web_server HTTP SSE log stream for devices that have
|
||||
# web_server: but no api: (the logging counterpart to web_server OTA).
|
||||
if has_web_server_logging() and (
|
||||
network_devices := _resolve_network_devices(devices, config, args)
|
||||
):
|
||||
return _show_logs_via_web_server(config, network_devices)
|
||||
|
||||
raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)")
|
||||
|
||||
|
||||
|
||||
@@ -328,6 +328,24 @@ def resolve_ip_address(
|
||||
return res
|
||||
|
||||
|
||||
def format_ip_url(family: int, sockaddr: tuple, port: int, path: str) -> str:
|
||||
"""Build an ``http://host:port/path`` URL for a resolved address.
|
||||
|
||||
``family``/``sockaddr`` come from a :func:`resolve_ip_address` entry. IPv6
|
||||
literals must be wrapped in brackets in URLs; link-local addresses need a
|
||||
percent-encoded zone index per RFC 6874.
|
||||
"""
|
||||
import socket
|
||||
|
||||
ip = sockaddr[0]
|
||||
if family == socket.AF_INET6:
|
||||
scope = sockaddr[3] if len(sockaddr) >= 4 else 0
|
||||
host_part = f"[{ip}%25{scope}]" if scope else f"[{ip}]"
|
||||
else:
|
||||
host_part = ip
|
||||
return f"http://{host_part}:{port}{path}"
|
||||
|
||||
|
||||
def sort_ip_addresses(address_list: list[str]) -> list[str]:
|
||||
"""Takes a list of IP addresses in string form, e.g. from mDNS or MQTT,
|
||||
and sorts them into the best order to actually try connecting to them.
|
||||
|
||||
43
esphome/web_server_helpers.py
Normal file
43
esphome/web_server_helpers.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Shared helpers for the web_server HTTP transports (OTA upload and logs)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from esphome.const import (
|
||||
CONF_AUTH,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_USERNAME,
|
||||
CONF_WEB_SERVER,
|
||||
)
|
||||
from esphome.core import CORE, EsphomeError
|
||||
from esphome.helpers import format_ip_url, resolve_ip_address
|
||||
from esphome.types import ConfigType
|
||||
|
||||
|
||||
def resolve_web_server_urls(host: str, port: int, path: str) -> list[tuple[str, str]]:
|
||||
"""Resolve ``host`` to ``(ip, url)`` pairs for the web_server ``path``.
|
||||
|
||||
Wraps :func:`resolve_ip_address` (honoring ``CORE.address_cache``) and
|
||||
formats each resolved address into an ``http://host:port/path`` URL via
|
||||
:func:`format_ip_url`, handling both IPv4 and IPv6. Shared by the
|
||||
web_server OTA upload and log streaming paths.
|
||||
"""
|
||||
addr_infos = resolve_ip_address(host, port, address_cache=CORE.address_cache)
|
||||
return [
|
||||
(sockaddr[0], format_ip_url(family, sockaddr, port, path))
|
||||
for family, _socktype, _, _, sockaddr in addr_infos
|
||||
]
|
||||
|
||||
|
||||
def get_web_server_connection(config: ConfigType) -> tuple[int, str | None, str | None]:
|
||||
"""Return ``(port, username, password)`` for the web_server HTTP endpoint.
|
||||
|
||||
Reads the port and optional HTTP Basic-auth credentials from the validated
|
||||
``web_server:`` config, shared by the web_server OTA upload and log
|
||||
streaming paths. Raises :class:`EsphomeError` if ``web_server`` is absent.
|
||||
"""
|
||||
web_conf = config.get(CONF_WEB_SERVER)
|
||||
if not web_conf:
|
||||
raise EsphomeError(f"The {CONF_WEB_SERVER} component is not configured.")
|
||||
auth = web_conf.get(CONF_AUTH) or {}
|
||||
return int(web_conf[CONF_PORT]), auth.get(CONF_USERNAME), auth.get(CONF_PASSWORD)
|
||||
189
esphome/web_server_logs.py
Normal file
189
esphome/web_server_logs.py
Normal file
@@ -0,0 +1,189 @@
|
||||
"""Stream device logs over the ``web_server`` component's HTTP SSE endpoint.
|
||||
|
||||
The ``web_server`` component exposes a Server-Sent Events stream at ``/events``
|
||||
that multiplexes entity state, keepalive pings, and log lines (``event: log``).
|
||||
This is the logging counterpart to the web_server OTA upload path
|
||||
(:mod:`esphome.web_server_ota`); it lets ``esphome logs`` reach a device that
|
||||
has ``web_server:`` configured but no ``api:``.
|
||||
|
||||
Only the ``event: log`` frames are rendered; the payload is the device's
|
||||
already-formatted, ANSI-colored log line, so it is passed through the same
|
||||
``LogParser`` + ``safe_print`` path the serial and native-API log viewers use.
|
||||
The stream is long-lived and the server drops idle connections, so the reader
|
||||
reconnects automatically until interrupted.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.util import safe_print
|
||||
from esphome.web_server_helpers import resolve_web_server_urls
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from aioesphomeapi import LogParser
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
EVENTS_PATH = "/events"
|
||||
# (connect_timeout, read_timeout). The device sends a keepalive ``ping`` every
|
||||
# 10s, so a 30s read timeout tolerates a few missed pings before we treat the
|
||||
# connection as dead and reconnect.
|
||||
TIMEOUT = (10.0, 30.0)
|
||||
# Pause between reconnect attempts so a downed device doesn't spin the CPU.
|
||||
RECONNECT_DELAY = 1.0
|
||||
# Upper bound for the exponential backoff applied to consecutive failures, so an
|
||||
# unreachable host backs off instead of retrying (and logging) once a second.
|
||||
MAX_RECONNECT_DELAY = 10.0
|
||||
|
||||
|
||||
class WebServerLogsError(EsphomeError):
|
||||
"""Raised when the web_server log stream cannot be used (e.g. bad auth)."""
|
||||
|
||||
|
||||
def _build_urls(hosts: list[str], port: int) -> list[tuple[str, str]]:
|
||||
"""Resolve ``hosts`` to ``(ip, url)`` pairs for the ``/events`` endpoint."""
|
||||
urls: list[tuple[str, str]] = []
|
||||
seen: set[str] = set()
|
||||
for host in hosts:
|
||||
try:
|
||||
resolved = resolve_web_server_urls(host, port, EVENTS_PATH)
|
||||
except EsphomeError as err:
|
||||
_LOGGER.warning("Error resolving IP address of %s: %s", host, err)
|
||||
continue
|
||||
for ip, url in resolved:
|
||||
if url not in seen:
|
||||
seen.add(url)
|
||||
urls.append((ip, url))
|
||||
return urls
|
||||
|
||||
|
||||
def _emit(data_lines: list[str], parser: LogParser) -> None:
|
||||
"""Render the accumulated ``data:`` lines of one ``event: log`` frame."""
|
||||
time_ = datetime.now().astimezone()
|
||||
milliseconds = time_.microsecond // 1000
|
||||
time_str = (
|
||||
f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]"
|
||||
)
|
||||
for line in data_lines:
|
||||
safe_print(parser.parse_line(line, time_str))
|
||||
|
||||
|
||||
def _consume(response: requests.Response, parser: LogParser) -> None:
|
||||
"""Parse the SSE stream, rendering only ``event: log`` frames.
|
||||
|
||||
Implements the minimal slice of the SSE grammar the ``web_server`` stream
|
||||
uses: ``field: value`` lines (with one optional leading space after the
|
||||
colon) accumulated until a blank line dispatches the frame. ``id:``,
|
||||
``retry:``, and comment (``:``) lines are ignored, as are non-``log``
|
||||
events (``ping``, ``state``, ...).
|
||||
"""
|
||||
event_type = "message"
|
||||
data_lines: list[str] = []
|
||||
# Iterate bytes and decode as UTF-8 ourselves (matching run_miniterm); the
|
||||
# text/event-stream response has no charset, so requests' decode_unicode
|
||||
# would fall back to Latin-1 and mojibake UTF-8 log characters.
|
||||
for raw in response.iter_lines():
|
||||
line = raw.decode("utf8", "backslashreplace")
|
||||
if not line:
|
||||
if event_type == "log" and data_lines:
|
||||
_emit(data_lines, parser)
|
||||
event_type = "message"
|
||||
data_lines = []
|
||||
continue
|
||||
if line.startswith(":"):
|
||||
continue
|
||||
field, _, value = line.partition(":")
|
||||
value = value.removeprefix(" ")
|
||||
if field == "event":
|
||||
event_type = value
|
||||
elif field == "data":
|
||||
data_lines.append(value)
|
||||
|
||||
|
||||
def _stream(url: str, ip: str, auth: HTTPBasicAuth | None, parser: LogParser) -> bool:
|
||||
"""Connect and stream one session.
|
||||
|
||||
Returns ``True`` if a connection was established (even if it later
|
||||
dropped), ``False`` if the connection attempt itself failed so the caller
|
||||
can try the next resolved address.
|
||||
"""
|
||||
connected = False
|
||||
_LOGGER.info("Connecting to %s ...", url)
|
||||
try:
|
||||
with requests.get(
|
||||
url,
|
||||
stream=True,
|
||||
auth=auth,
|
||||
timeout=TIMEOUT,
|
||||
headers={"Accept": "text/event-stream"},
|
||||
) as response:
|
||||
if response.status_code == 401:
|
||||
raise WebServerLogsError(
|
||||
"Authentication failed (HTTP 401). Check the 'web_server' "
|
||||
"'auth' username and password."
|
||||
)
|
||||
if response.status_code in (403, 404):
|
||||
# Permanent: the endpoint won't appear on retry (wrong version,
|
||||
# 'log' disabled, or forbidden). Surface it instead of looping.
|
||||
raise WebServerLogsError(
|
||||
f"Device returned HTTP {response.status_code} for "
|
||||
f"{EVENTS_PATH}; the web_server log stream is unavailable. "
|
||||
"Ensure 'web_server' is version 2 or higher with 'log' enabled."
|
||||
)
|
||||
if response.status_code != 200:
|
||||
_LOGGER.error(
|
||||
"Unexpected HTTP %s response from %s", response.status_code, ip
|
||||
)
|
||||
return False
|
||||
connected = True
|
||||
_LOGGER.info("Connected to %s", ip)
|
||||
_consume(response, parser)
|
||||
except requests.RequestException as err:
|
||||
if connected:
|
||||
_LOGGER.info("Log stream from %s ended (%s); reconnecting...", ip, err)
|
||||
else:
|
||||
_LOGGER.warning("Could not connect to %s: %s", ip, err)
|
||||
return connected
|
||||
|
||||
|
||||
def run_logs(
|
||||
hosts: list[str],
|
||||
port: int,
|
||||
username: str | None,
|
||||
password: str | None,
|
||||
) -> int:
|
||||
"""Stream logs from the first reachable host over the web_server SSE feed.
|
||||
|
||||
Reconnects automatically when the stream drops and returns ``0`` on
|
||||
``KeyboardInterrupt`` (Ctrl+C), mirroring how the serial log viewer exits.
|
||||
"""
|
||||
from aioesphomeapi import LogParser
|
||||
|
||||
auth = HTTPBasicAuth(username, password) if username and password else None
|
||||
parser = LogParser()
|
||||
delay = RECONNECT_DELAY
|
||||
try:
|
||||
while True:
|
||||
if not (urls := _build_urls(hosts, port)):
|
||||
_LOGGER.error("Could not resolve any of: %s", ", ".join(hosts))
|
||||
connected = False
|
||||
else:
|
||||
# ``any`` stops at the first address that connects; when that
|
||||
# stream drops we reconnect to the same set on the next pass.
|
||||
connected = any(_stream(url, ip, auth, parser) for ip, url in urls)
|
||||
# Reset the backoff once we reach the device; otherwise grow it
|
||||
# (capped) so an unreachable host doesn't retry/log once a second.
|
||||
delay = (
|
||||
RECONNECT_DELAY if connected else min(delay * 2, MAX_RECONNECT_DELAY)
|
||||
)
|
||||
time.sleep(delay)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
@@ -12,14 +12,14 @@ import io
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import secrets
|
||||
import socket
|
||||
from typing import BinaryIO
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.helpers import ProgressBar, resolve_ip_address
|
||||
from esphome.helpers import ProgressBar
|
||||
from esphome.web_server_helpers import resolve_web_server_urls
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -95,7 +95,7 @@ def _try_upload(
|
||||
from esphome.core import CORE
|
||||
|
||||
try:
|
||||
addr_infos = resolve_ip_address(host, port, address_cache=CORE.address_cache)
|
||||
addr_urls = resolve_web_server_urls(host, port, OTA_PATH)
|
||||
except EsphomeError as err:
|
||||
_LOGGER.error(
|
||||
"Error resolving IP address of %s. Is it connected to WiFi?", host
|
||||
@@ -104,7 +104,7 @@ def _try_upload(
|
||||
_LOGGER.error("(If you know the IP, try --device <IP>)")
|
||||
raise WebServerOTAError(err) from err
|
||||
|
||||
if not addr_infos:
|
||||
if not addr_urls:
|
||||
_LOGGER.error("Could not resolve %s", host)
|
||||
return 1, None
|
||||
|
||||
@@ -113,16 +113,7 @@ def _try_upload(
|
||||
auth = HTTPBasicAuth(username, password) if username and password else None
|
||||
|
||||
# Iterate resolved IPs (IPv4 + IPv6 candidates) just like espota2 does.
|
||||
for af, _socktype, _, _, sa in addr_infos:
|
||||
ip = sa[0]
|
||||
# IPv6 literals must be wrapped in brackets in URLs; link-local
|
||||
# addresses need a percent-encoded zone index per RFC 6874.
|
||||
if af == socket.AF_INET6:
|
||||
scope = sa[3] if len(sa) >= 4 else 0
|
||||
host_part = f"[{ip}%25{scope}]" if scope else f"[{ip}]"
|
||||
else:
|
||||
host_part = ip
|
||||
url = f"http://{host_part}:{port}{OTA_PATH}"
|
||||
for ip, url in addr_urls:
|
||||
_LOGGER.info("Connecting to %s port %s...", ip, port)
|
||||
|
||||
try:
|
||||
|
||||
@@ -14,7 +14,7 @@ import pytest
|
||||
from esphome import helpers
|
||||
from esphome.address_cache import AddressCache
|
||||
from esphome.core import CORE, EsphomeError
|
||||
from esphome.helpers import ProgressBar
|
||||
from esphome.helpers import ProgressBar, format_ip_url
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -151,6 +151,22 @@ def test_is_ip_address__invalid(host):
|
||||
assert actual is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("family", "sockaddr", "expected"),
|
||||
(
|
||||
(socket.AF_INET, ("192.168.1.5", 80), "http://192.168.1.5:80/events"),
|
||||
(socket.AF_INET6, ("2001:db8::1", 80, 0, 0), "http://[2001:db8::1]:80/events"),
|
||||
(
|
||||
socket.AF_INET6,
|
||||
("fe80::1", 8080, 0, 7),
|
||||
"http://[fe80::1%257]:8080/events",
|
||||
),
|
||||
),
|
||||
)
|
||||
def test_format_ip_url(family, sockaddr, expected):
|
||||
assert format_ip_url(family, sockaddr, sockaddr[1], "/events") == expected
|
||||
|
||||
|
||||
@settings(deadline=None)
|
||||
@given(value=ip_addresses(v=4).map(str))
|
||||
def test_is_ip_address__valid(value):
|
||||
|
||||
@@ -49,6 +49,7 @@ from esphome.__main__ import (
|
||||
has_non_ip_address,
|
||||
has_ota,
|
||||
has_resolvable_address,
|
||||
has_web_server_logging,
|
||||
has_web_server_ota,
|
||||
mqtt_get_ip,
|
||||
parse_args,
|
||||
@@ -72,6 +73,7 @@ from esphome.const import (
|
||||
CONF_DISABLED,
|
||||
CONF_ESPHOME,
|
||||
CONF_LEVEL,
|
||||
CONF_LOG,
|
||||
CONF_LOG_TOPIC,
|
||||
CONF_LOGGER,
|
||||
CONF_MDNS,
|
||||
@@ -86,6 +88,7 @@ from esphome.const import (
|
||||
CONF_TOPIC,
|
||||
CONF_USE_ADDRESS,
|
||||
CONF_USERNAME,
|
||||
CONF_VERSION,
|
||||
CONF_WEB_SERVER,
|
||||
CONF_WIFI,
|
||||
KEY_CORE,
|
||||
@@ -734,6 +737,30 @@ def test_choose_upload_log_host_with_ota_device_with_api_config_logging() -> Non
|
||||
assert result == ["192.168.1.100"]
|
||||
|
||||
|
||||
def test_choose_upload_log_host_logging_web_server_only_ip() -> None:
|
||||
"""A web_server-only device with a static IP resolves to that IP for logs."""
|
||||
setup_core(config={CONF_WEB_SERVER: {}}, address="192.168.1.100")
|
||||
|
||||
result = choose_upload_log_host(
|
||||
default="OTA",
|
||||
check_default=None,
|
||||
purpose=Purpose.LOGGING,
|
||||
)
|
||||
assert result == ["192.168.1.100"]
|
||||
|
||||
|
||||
def test_choose_upload_log_host_logging_web_server_only_mdns() -> None:
|
||||
"""A web_server-only device with a .local name resolves to that hostname."""
|
||||
setup_core(config={CONF_WEB_SERVER: {}}, address="test.local")
|
||||
|
||||
result = choose_upload_log_host(
|
||||
default="OTA",
|
||||
check_default=None,
|
||||
purpose=Purpose.LOGGING,
|
||||
)
|
||||
assert result == ["test.local"]
|
||||
|
||||
|
||||
def test_choose_upload_log_host_logging_without_api_reports_missing_api() -> None:
|
||||
"""A resolvable device with only ota: fails logs with a missing-api message."""
|
||||
setup_core(
|
||||
@@ -773,6 +800,17 @@ def test_unresolved_default_error_unresolvable_keeps_dashboard_hint() -> None:
|
||||
assert "set 'use_address'" in msg
|
||||
|
||||
|
||||
def test_unresolved_default_error_logging_suggests_web_server() -> None:
|
||||
"""The missing-api log message lists web_server among the remediations."""
|
||||
setup_core(
|
||||
config={CONF_OTA: [{CONF_PLATFORM: CONF_ESPHOME}]}, address="192.168.1.100"
|
||||
)
|
||||
|
||||
msg = _unresolved_default_error(Purpose.LOGGING, ["OTA"])
|
||||
assert "no 'api:' component is configured" in msg
|
||||
assert "'web_server:'" in msg
|
||||
|
||||
|
||||
def test_unresolved_default_error_upload_with_ota_is_generic() -> None:
|
||||
"""With ota: present the upload error stays generic, not transport-specific."""
|
||||
setup_core(
|
||||
@@ -2376,6 +2414,30 @@ def test_has_web_server_ota_returns_false_without_config() -> None:
|
||||
assert has_ota() is True
|
||||
|
||||
|
||||
def test_has_web_server_logging_default() -> None:
|
||||
"""has_web_server_logging is True for a default web_server (v2, log on)."""
|
||||
setup_core(config={CONF_WEB_SERVER: {}})
|
||||
assert has_web_server_logging() is True
|
||||
|
||||
|
||||
def test_has_web_server_logging_without_config() -> None:
|
||||
"""has_web_server_logging is False when web_server is not configured."""
|
||||
setup_core(config={CONF_API: {}})
|
||||
assert has_web_server_logging() is False
|
||||
|
||||
|
||||
def test_has_web_server_logging_v1_has_no_events_stream() -> None:
|
||||
"""has_web_server_logging is False for v1, which has no /events endpoint."""
|
||||
setup_core(config={CONF_WEB_SERVER: {CONF_VERSION: 1}})
|
||||
assert has_web_server_logging() is False
|
||||
|
||||
|
||||
def test_has_web_server_logging_respects_log_disabled() -> None:
|
||||
"""has_web_server_logging is False when the web_server log option is off."""
|
||||
setup_core(config={CONF_WEB_SERVER: {CONF_LOG: False}})
|
||||
assert has_web_server_logging() is False
|
||||
|
||||
|
||||
def test_upload_program_web_server_only_auto_dispatches(
|
||||
mock_run_web_server_ota: Mock,
|
||||
mock_run_ota: Mock,
|
||||
@@ -2945,6 +3007,77 @@ def test_show_logs_network_with_mqtt_only(
|
||||
)
|
||||
|
||||
|
||||
@patch("esphome.web_server_logs.run_logs")
|
||||
def test_show_logs_web_server(
|
||||
mock_run_logs: Mock,
|
||||
) -> None:
|
||||
"""A web_server-only device streams logs over the HTTP SSE endpoint."""
|
||||
setup_core(
|
||||
config={
|
||||
"logger": {},
|
||||
CONF_WEB_SERVER: {CONF_PORT: 80},
|
||||
# No API or MQTT configured
|
||||
},
|
||||
platform=PLATFORM_ESP32,
|
||||
)
|
||||
mock_run_logs.return_value = 0
|
||||
|
||||
result = show_logs(CORE.config, MockArgs(), ["192.168.1.100"])
|
||||
|
||||
assert result == 0
|
||||
mock_run_logs.assert_called_once_with(["192.168.1.100"], 80, None, None)
|
||||
|
||||
|
||||
@patch("esphome.web_server_logs.run_logs")
|
||||
def test_show_logs_web_server_with_auth_and_port(
|
||||
mock_run_logs: Mock,
|
||||
) -> None:
|
||||
"""web_server port and basic-auth credentials are forwarded to the streamer."""
|
||||
setup_core(
|
||||
config={
|
||||
"logger": {},
|
||||
CONF_WEB_SERVER: {
|
||||
CONF_PORT: 8080,
|
||||
CONF_AUTH: {CONF_USERNAME: "admin", CONF_PASSWORD: "secret"},
|
||||
},
|
||||
},
|
||||
platform=PLATFORM_ESP32,
|
||||
)
|
||||
mock_run_logs.return_value = 0
|
||||
|
||||
result = show_logs(CORE.config, MockArgs(), ["192.168.1.100"])
|
||||
|
||||
assert result == 0
|
||||
mock_run_logs.assert_called_once_with(["192.168.1.100"], 8080, "admin", "secret")
|
||||
|
||||
|
||||
@patch("esphome.web_server_logs.run_logs")
|
||||
@patch("esphome.mqtt.show_logs")
|
||||
def test_show_logs_mqtt_preferred_over_web_server(
|
||||
mock_mqtt_show_logs: Mock,
|
||||
mock_run_logs: Mock,
|
||||
) -> None:
|
||||
"""With both MQTT logging and web_server, MQTT wins (API > MQTT > web_server)."""
|
||||
setup_core(
|
||||
config={
|
||||
"logger": {},
|
||||
"mqtt": {CONF_BROKER: "mqtt.local"},
|
||||
CONF_WEB_SERVER: {CONF_PORT: 80},
|
||||
},
|
||||
platform=PLATFORM_ESP32,
|
||||
)
|
||||
mock_mqtt_show_logs.return_value = 0
|
||||
|
||||
args = MockArgs(
|
||||
topic="esphome/logs", username="user", password="pass", client_id="client"
|
||||
)
|
||||
result = show_logs(CORE.config, args, ["192.168.1.100"])
|
||||
|
||||
assert result == 0
|
||||
mock_mqtt_show_logs.assert_called_once()
|
||||
mock_run_logs.assert_not_called()
|
||||
|
||||
|
||||
def test_show_logs_no_method_configured() -> None:
|
||||
"""Test show_logs when no remote logging method is configured."""
|
||||
setup_core(
|
||||
|
||||
64
tests/unit_tests/test_web_server_helpers.py
Normal file
64
tests/unit_tests/test_web_server_helpers.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Unit tests for esphome.web_server_helpers module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import socket
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome.const import (
|
||||
CONF_AUTH,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_USERNAME,
|
||||
CONF_WEB_SERVER,
|
||||
)
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.web_server_helpers import (
|
||||
get_web_server_connection,
|
||||
resolve_web_server_urls,
|
||||
)
|
||||
|
||||
|
||||
def test_resolve_web_server_urls_maps_ipv4_and_ipv6(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Each resolved address becomes an (ip, url) pair with IPv6 bracketing."""
|
||||
addr_infos = [
|
||||
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.5", 80)),
|
||||
(socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("fe80::1", 80, 0, 7)),
|
||||
]
|
||||
monkeypatch.setattr(
|
||||
"esphome.web_server_helpers.resolve_ip_address",
|
||||
lambda *args, **kwargs: addr_infos,
|
||||
)
|
||||
|
||||
assert resolve_web_server_urls("dev.local", 80, "/events") == [
|
||||
("192.168.1.5", "http://192.168.1.5:80/events"),
|
||||
("fe80::1", "http://[fe80::1%257]:80/events"),
|
||||
]
|
||||
|
||||
|
||||
def test_get_web_server_connection_without_auth() -> None:
|
||||
"""Port is returned and credentials are None when no auth is configured."""
|
||||
config = {CONF_WEB_SERVER: {CONF_PORT: 80}}
|
||||
|
||||
assert get_web_server_connection(config) == (80, None, None)
|
||||
|
||||
|
||||
def test_get_web_server_connection_with_auth() -> None:
|
||||
"""Port and HTTP Basic credentials are returned when auth is configured."""
|
||||
config = {
|
||||
CONF_WEB_SERVER: {
|
||||
CONF_PORT: 8080,
|
||||
CONF_AUTH: {CONF_USERNAME: "admin", CONF_PASSWORD: "secret"},
|
||||
}
|
||||
}
|
||||
|
||||
assert get_web_server_connection(config) == (8080, "admin", "secret")
|
||||
|
||||
|
||||
def test_get_web_server_connection_missing_component() -> None:
|
||||
"""A config without web_server raises a clear error."""
|
||||
with pytest.raises(EsphomeError, match="web_server.*not configured"):
|
||||
get_web_server_connection({})
|
||||
397
tests/unit_tests/test_web_server_logs.py
Normal file
397
tests/unit_tests/test_web_server_logs.py
Normal file
@@ -0,0 +1,397 @@
|
||||
"""Unit tests for esphome.web_server_logs module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
import logging
|
||||
import socket
|
||||
from typing import Self
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from esphome import web_server_logs
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.web_server_logs import (
|
||||
EVENTS_PATH,
|
||||
WebServerLogsError,
|
||||
_build_urls,
|
||||
_consume,
|
||||
_stream,
|
||||
run_logs,
|
||||
)
|
||||
|
||||
# A realistic slice of the web_server /events SSE stream: an initial ping
|
||||
# carrying the config, a state frame, two log frames (one multi-line), plus
|
||||
# comment/id/retry lines that must be ignored.
|
||||
SSE_LINES = [
|
||||
"retry: 30000",
|
||||
"id: 12345",
|
||||
"event: ping",
|
||||
'data: {"title":"dev","log":true}',
|
||||
"",
|
||||
"event: state",
|
||||
'data: {"id":"sensor-x","state":"ON"}',
|
||||
"",
|
||||
"event: log",
|
||||
"data: \x1b[0;32m[I][main:001]: hello\x1b[0m",
|
||||
"",
|
||||
": keepalive-comment",
|
||||
"event: log",
|
||||
"data: line one",
|
||||
"data: line two",
|
||||
"",
|
||||
]
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
"""Minimal stand-in for a streamed ``requests`` response."""
|
||||
|
||||
def __init__(self, status_code: int, lines: list[str]) -> None:
|
||||
self.status_code = status_code
|
||||
self._lines = lines
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc: object) -> bool:
|
||||
return False
|
||||
|
||||
def iter_lines(self) -> Iterator[bytes]:
|
||||
for line in self._lines:
|
||||
yield line.encode("utf8")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_parser() -> MagicMock:
|
||||
"""A LogParser whose parse_line returns the raw line unchanged."""
|
||||
parser = MagicMock()
|
||||
parser.parse_line.side_effect = lambda line, time_str: line
|
||||
return parser
|
||||
|
||||
|
||||
def _patch_resolve(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
addr_infos: list[tuple[int, int, int, str, tuple]],
|
||||
) -> None:
|
||||
monkeypatch.setattr(
|
||||
"esphome.web_server_helpers.resolve_ip_address",
|
||||
lambda *args, **kwargs: addr_infos,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_urls
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_build_urls_ipv4(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""An IPv4 host resolves to a plain http://ip:port/events URL."""
|
||||
_patch_resolve(
|
||||
monkeypatch,
|
||||
[(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.5", 80))],
|
||||
)
|
||||
|
||||
assert _build_urls(["dev.local"], 80) == [
|
||||
("192.168.1.5", f"http://192.168.1.5:80{EVENTS_PATH}")
|
||||
]
|
||||
|
||||
|
||||
def test_build_urls_ipv6_brackets_and_zone(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""IPv6 literals are bracketed; link-local addresses get a %25 zone index."""
|
||||
_patch_resolve(
|
||||
monkeypatch,
|
||||
[(socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("fe80::1", 8080, 0, 7))],
|
||||
)
|
||||
|
||||
assert _build_urls(["dev.local"], 8080) == [
|
||||
("fe80::1", f"http://[fe80::1%257]:8080{EVENTS_PATH}")
|
||||
]
|
||||
|
||||
|
||||
def test_build_urls_dedups_and_skips_unresolvable(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Duplicate resolved IPs collapse to one URL; resolve errors are skipped."""
|
||||
calls: list[str] = []
|
||||
|
||||
def fake_resolve(host: str, port: int, **kwargs: object) -> list[tuple]:
|
||||
calls.append(host)
|
||||
if host == "bad":
|
||||
raise EsphomeError("nope")
|
||||
return [(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", port))]
|
||||
|
||||
monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", fake_resolve)
|
||||
|
||||
# "good" and "dup" both resolve to 10.0.0.1, "bad" raises.
|
||||
assert _build_urls(["good", "bad", "dup"], 80) == [
|
||||
("10.0.0.1", f"http://10.0.0.1:80{EVENTS_PATH}")
|
||||
]
|
||||
assert calls == ["good", "bad", "dup"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _consume (SSE parsing)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_consume_emits_only_log_frames(
|
||||
monkeypatch: pytest.MonkeyPatch, fake_parser: MagicMock
|
||||
) -> None:
|
||||
"""Only event: log data lines are printed; ping/state/comments are ignored."""
|
||||
printed: list[str] = []
|
||||
monkeypatch.setattr(web_server_logs, "safe_print", printed.append)
|
||||
|
||||
_consume(_FakeResponse(200, SSE_LINES), fake_parser)
|
||||
|
||||
assert printed == [
|
||||
"\x1b[0;32m[I][main:001]: hello\x1b[0m",
|
||||
"line one",
|
||||
"line two",
|
||||
]
|
||||
|
||||
|
||||
def test_consume_ignores_unterminated_trailing_frame(
|
||||
monkeypatch: pytest.MonkeyPatch, fake_parser: MagicMock
|
||||
) -> None:
|
||||
"""A log frame without its terminating blank line is not emitted."""
|
||||
printed: list[str] = []
|
||||
monkeypatch.setattr(web_server_logs, "safe_print", printed.append)
|
||||
|
||||
_consume(_FakeResponse(200, ["event: log", "data: dangling"]), fake_parser)
|
||||
|
||||
assert printed == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _stream
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_stream_returns_false_when_connect_fails(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
fake_parser: MagicMock,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""A failed connection logs a warning and reports not-connected."""
|
||||
|
||||
def boom(*args: object, **kwargs: object) -> _FakeResponse:
|
||||
raise requests.ConnectionError("refused")
|
||||
|
||||
monkeypatch.setattr(requests, "get", boom)
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
assert (
|
||||
_stream("http://10.0.0.1:80/events", "10.0.0.1", None, fake_parser) is False
|
||||
)
|
||||
assert "Could not connect to 10.0.0.1" in caplog.text
|
||||
|
||||
|
||||
def test_stream_returns_true_when_established_then_dropped(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
fake_parser: MagicMock,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""A mid-stream drop after connecting reports connected so we reconnect."""
|
||||
printed: list[str] = []
|
||||
monkeypatch.setattr(web_server_logs, "safe_print", printed.append)
|
||||
|
||||
class _DroppingResponse(_FakeResponse):
|
||||
def iter_lines(self) -> Iterator[bytes]:
|
||||
yield b"event: log"
|
||||
yield b"data: before-drop"
|
||||
yield b""
|
||||
raise requests.exceptions.ChunkedEncodingError("connection lost")
|
||||
|
||||
monkeypatch.setattr(requests, "get", lambda *a, **kw: _DroppingResponse(200, []))
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
assert (
|
||||
_stream("http://10.0.0.1:80/events", "10.0.0.1", None, fake_parser) is True
|
||||
)
|
||||
assert printed == ["before-drop"]
|
||||
assert "reconnecting" in caplog.text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_logs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_logs_streams_then_reconnects_until_interrupt(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""A dropped stream reconnects; KeyboardInterrupt during the pause exits 0."""
|
||||
monkeypatch.setattr(
|
||||
web_server_logs,
|
||||
"_build_urls",
|
||||
lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")],
|
||||
)
|
||||
printed: list[str] = []
|
||||
monkeypatch.setattr(web_server_logs, "safe_print", printed.append)
|
||||
monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(200, SSE_LINES))
|
||||
|
||||
def stop(_delay: float) -> None:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
monkeypatch.setattr(web_server_logs.time, "sleep", stop)
|
||||
|
||||
assert run_logs(["dev.local"], 80, None, None) == 0
|
||||
# The single stream was consumed before the reconnect pause interrupted us.
|
||||
# run_logs renders through the real LogParser, which prefixes a timestamp,
|
||||
# so assert on the payloads rather than exact equality.
|
||||
assert len(printed) == 3
|
||||
assert "[I][main:001]: hello" in printed[0]
|
||||
assert "line one" in printed[1]
|
||||
assert "line two" in printed[2]
|
||||
|
||||
|
||||
def test_run_logs_passes_basic_auth(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Username + password are forwarded as HTTP Basic auth on the request."""
|
||||
monkeypatch.setattr(
|
||||
web_server_logs,
|
||||
"_build_urls",
|
||||
lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")],
|
||||
)
|
||||
monkeypatch.setattr(web_server_logs, "safe_print", lambda line: None)
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def fake_get(url: str, **kwargs: object) -> _FakeResponse:
|
||||
captured.update(kwargs)
|
||||
captured["url"] = url
|
||||
return _FakeResponse(200, SSE_LINES)
|
||||
|
||||
monkeypatch.setattr(requests, "get", fake_get)
|
||||
monkeypatch.setattr(
|
||||
web_server_logs.time,
|
||||
"sleep",
|
||||
lambda _d: (_ for _ in ()).throw(KeyboardInterrupt()),
|
||||
)
|
||||
|
||||
assert run_logs(["dev.local"], 80, "admin", "secret") == 0
|
||||
auth = captured["auth"]
|
||||
assert isinstance(auth, HTTPBasicAuth)
|
||||
assert (auth.username, auth.password) == ("admin", "secret")
|
||||
assert captured["stream"] is True
|
||||
assert captured["headers"] == {"Accept": "text/event-stream"}
|
||||
|
||||
|
||||
def test_run_logs_no_auth_when_credentials_missing(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""No auth object is sent when username/password are not configured."""
|
||||
monkeypatch.setattr(
|
||||
web_server_logs,
|
||||
"_build_urls",
|
||||
lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")],
|
||||
)
|
||||
monkeypatch.setattr(web_server_logs, "safe_print", lambda line: None)
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def fake_get(url: str, **kwargs: object) -> _FakeResponse:
|
||||
captured.update(kwargs)
|
||||
return _FakeResponse(200, SSE_LINES)
|
||||
|
||||
monkeypatch.setattr(requests, "get", fake_get)
|
||||
monkeypatch.setattr(
|
||||
web_server_logs.time,
|
||||
"sleep",
|
||||
lambda _d: (_ for _ in ()).throw(KeyboardInterrupt()),
|
||||
)
|
||||
|
||||
assert run_logs(["dev.local"], 80, None, None) == 0
|
||||
assert captured["auth"] is None
|
||||
|
||||
|
||||
def test_run_logs_raises_on_auth_failure(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""HTTP 401 aborts with a clear error rather than reconnecting forever."""
|
||||
monkeypatch.setattr(
|
||||
web_server_logs,
|
||||
"_build_urls",
|
||||
lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")],
|
||||
)
|
||||
monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(401, []))
|
||||
|
||||
with pytest.raises(WebServerLogsError, match="Authentication failed"):
|
||||
run_logs(["dev.local"], 80, "admin", "bad")
|
||||
|
||||
|
||||
def test_run_logs_retries_on_transient_status(
|
||||
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""A transient non-200 (e.g. 503) is logged and the loop retries."""
|
||||
monkeypatch.setattr(
|
||||
web_server_logs,
|
||||
"_build_urls",
|
||||
lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")],
|
||||
)
|
||||
monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(503, []))
|
||||
monkeypatch.setattr(
|
||||
web_server_logs.time,
|
||||
"sleep",
|
||||
lambda _d: (_ for _ in ()).throw(KeyboardInterrupt()),
|
||||
)
|
||||
|
||||
with caplog.at_level(logging.ERROR):
|
||||
assert run_logs(["dev.local"], 80, None, None) == 0
|
||||
assert "Unexpected HTTP 503" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", (403, 404))
|
||||
def test_run_logs_raises_on_permanent_status(
|
||||
monkeypatch: pytest.MonkeyPatch, status: int
|
||||
) -> None:
|
||||
"""A permanent 403/404 aborts instead of retrying the endpoint forever."""
|
||||
monkeypatch.setattr(
|
||||
web_server_logs,
|
||||
"_build_urls",
|
||||
lambda hosts, port: [("10.0.0.1", "http://10.0.0.1:80/events")],
|
||||
)
|
||||
monkeypatch.setattr(requests, "get", lambda *a, **kw: _FakeResponse(status, []))
|
||||
|
||||
with pytest.raises(WebServerLogsError, match=str(status)):
|
||||
run_logs(["dev.local"], 80, None, None)
|
||||
|
||||
|
||||
def test_run_logs_backs_off_on_repeated_failure(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Consecutive unreachable attempts grow the reconnect delay up to the cap."""
|
||||
monkeypatch.setattr(web_server_logs, "_build_urls", lambda hosts, port: [])
|
||||
delays: list[float] = []
|
||||
|
||||
def record(delay: float) -> None:
|
||||
delays.append(delay)
|
||||
if len(delays) >= 4:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
monkeypatch.setattr(web_server_logs.time, "sleep", record)
|
||||
|
||||
assert run_logs(["dev.local"], 80, None, None) == 0
|
||||
# 1 -> 2 -> 4 -> 8 ... doubling, capped at MAX_RECONNECT_DELAY (10.0).
|
||||
assert delays == [2.0, 4.0, 8.0, 10.0]
|
||||
|
||||
|
||||
def test_run_logs_reports_unresolvable(
|
||||
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""When no host resolves, an error is logged and the loop pauses/retries."""
|
||||
monkeypatch.setattr(web_server_logs, "_build_urls", lambda hosts, port: [])
|
||||
|
||||
# Let the first reconnect pause pass so the loop continues, then interrupt
|
||||
# on the second so the retry path (the ``continue``) is exercised.
|
||||
sleeps = {"n": 0}
|
||||
|
||||
def sleep(_delay: float) -> None:
|
||||
sleeps["n"] += 1
|
||||
if sleeps["n"] >= 2:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
monkeypatch.setattr(web_server_logs.time, "sleep", sleep)
|
||||
|
||||
with caplog.at_level(logging.ERROR):
|
||||
assert run_logs(["dev.local"], 80, None, None) == 0
|
||||
assert sleeps["n"] == 2
|
||||
assert "Could not resolve" in caplog.text
|
||||
@@ -46,7 +46,7 @@ def _patch_resolve(
|
||||
for host, port in hosts
|
||||
]
|
||||
monkeypatch.setattr(
|
||||
"esphome.web_server_ota.resolve_ip_address", lambda *a, **kw: addr_infos
|
||||
"esphome.web_server_helpers.resolve_ip_address", lambda *a, **kw: addr_infos
|
||||
)
|
||||
|
||||
|
||||
@@ -475,7 +475,7 @@ def test_run_ota_resolution_failure(
|
||||
def _raise(*_args, **_kwargs):
|
||||
raise EsphomeError("dns failed")
|
||||
|
||||
monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _raise)
|
||||
monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _raise)
|
||||
|
||||
exit_code, host = run_ota(["does.not.exist"], 80, None, None, firmware)
|
||||
|
||||
@@ -491,7 +491,7 @@ def test_run_ota_resolution_failure_dashboard_mode(
|
||||
def _raise(*_args, **_kwargs):
|
||||
raise EsphomeError("dns failed")
|
||||
|
||||
monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _raise)
|
||||
monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _raise)
|
||||
monkeypatch.setattr(CORE, "dashboard", True)
|
||||
try:
|
||||
exit_code, host = run_ota(["does.not.exist"], 80, None, None, firmware)
|
||||
@@ -541,7 +541,7 @@ def test_run_ota_multiple_hosts_first_fails(
|
||||
def _resolve(host, port, address_cache=None): # noqa: ARG001
|
||||
return addr_lookup[host]
|
||||
|
||||
monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _resolve)
|
||||
monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _resolve)
|
||||
|
||||
with patch(
|
||||
"esphome.web_server_ota.requests.post",
|
||||
@@ -570,7 +570,7 @@ def test_run_ota_all_hosts_return_failure_no_exception(
|
||||
def _resolve(host, port, address_cache=None): # noqa: ARG001
|
||||
return addr_lookup[host]
|
||||
|
||||
monkeypatch.setattr("esphome.web_server_ota.resolve_ip_address", _resolve)
|
||||
monkeypatch.setattr("esphome.web_server_helpers.resolve_ip_address", _resolve)
|
||||
|
||||
exit_code, host = run_ota(["a.local", "b.local"], 80, None, None, firmware)
|
||||
|
||||
@@ -633,7 +633,7 @@ def test_run_ota_ipv6_url_brackets_host(
|
||||
(socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("2001:db8::1", 80, 0, 0)),
|
||||
]
|
||||
monkeypatch.setattr(
|
||||
"esphome.web_server_ota.resolve_ip_address", lambda *a, **kw: addr_infos
|
||||
"esphome.web_server_helpers.resolve_ip_address", lambda *a, **kw: addr_infos
|
||||
)
|
||||
|
||||
with patch(
|
||||
@@ -656,7 +656,7 @@ def test_run_ota_ipv6_link_local_includes_scope_id(
|
||||
(socket.AF_INET6, socket.SOCK_STREAM, 0, "", ("fe80::1", 80, 0, 3)),
|
||||
]
|
||||
monkeypatch.setattr(
|
||||
"esphome.web_server_ota.resolve_ip_address", lambda *a, **kw: addr_infos
|
||||
"esphome.web_server_helpers.resolve_ip_address", lambda *a, **kw: addr_infos
|
||||
)
|
||||
|
||||
with patch(
|
||||
|
||||
Reference in New Issue
Block a user