mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 15:10:51 +00:00
153 lines
5.6 KiB
Python
153 lines
5.6 KiB
Python
"""End-to-end OTA tests on the host platform.
|
|
|
|
Exercises the native OTA protocol against a real host binary, then asserts
|
|
pid is preserved across the post-OTA execv. A second OTA on the post-exec
|
|
instance covers the FD_CLOEXEC path.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Generator
|
|
from contextlib import contextmanager
|
|
import socket
|
|
|
|
import pytest
|
|
|
|
from esphome import espota2
|
|
|
|
from .conftest import run_binary, wait_and_connect_api_client
|
|
from .const import LOCALHOST, PORT_POLL_INTERVAL, PORT_WAIT_TIMEOUT
|
|
from .types import CompileFunction, ConfigWriter
|
|
|
|
DEVICE_NAME = "host-ota-test"
|
|
|
|
|
|
@contextmanager
|
|
def _reserve_port() -> Generator[tuple[int, socket.socket]]:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind(("", 0))
|
|
try:
|
|
yield s.getsockname()[1], s
|
|
finally:
|
|
s.close()
|
|
|
|
|
|
async def _wait_for_port(host: str, port: int, timeout: float) -> None:
|
|
"""Poll until a TCP port accepts connections, or raise TimeoutError."""
|
|
loop = asyncio.get_running_loop()
|
|
deadline = loop.time() + timeout
|
|
while loop.time() < deadline:
|
|
try:
|
|
_, writer = await asyncio.open_connection(host, port)
|
|
except (ConnectionRefusedError, OSError):
|
|
await asyncio.sleep(PORT_POLL_INTERVAL)
|
|
continue
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return
|
|
raise TimeoutError(f"Port {port} on {host} did not open within {timeout}s")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_host_ota_self_update(
|
|
yaml_config: str,
|
|
write_yaml_config: ConfigWriter,
|
|
compile_esphome: CompileFunction,
|
|
reserved_tcp_port: tuple[int, socket.socket],
|
|
) -> None:
|
|
"""Self-OTA: upload the running binary back to itself, expect re-exec."""
|
|
api_port, api_socket = reserved_tcp_port
|
|
with _reserve_port() as (ota_port, ota_socket):
|
|
yaml_config = yaml_config.replace("__OTA_PORT__", str(ota_port))
|
|
config_path = await write_yaml_config(yaml_config)
|
|
binary_path = await compile_esphome(config_path)
|
|
api_socket.close()
|
|
ota_socket.close()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
ota_staged = loop.create_future()
|
|
rebooted = loop.create_future()
|
|
|
|
def on_log(line: str) -> None:
|
|
if not ota_staged.done() and "OTA staged at" in line:
|
|
ota_staged.set_result(True)
|
|
if not rebooted.done() and "Rebooting safely" in line:
|
|
rebooted.set_result(True)
|
|
|
|
async with run_binary(binary_path, line_callback=on_log) as (proc, _lines):
|
|
await _wait_for_port(LOCALHOST, api_port, PORT_WAIT_TIMEOUT)
|
|
pid_before = proc.pid
|
|
async with wait_and_connect_api_client(port=api_port) as client:
|
|
info_before = await client.device_info()
|
|
assert info_before.name == DEVICE_NAME
|
|
|
|
# espota2 is blocking; run in executor.
|
|
rc, _ = await loop.run_in_executor(
|
|
None, espota2.run_ota, LOCALHOST, ota_port, None, binary_path
|
|
)
|
|
assert rc == 0, "espota2 reported failure"
|
|
|
|
await asyncio.wait_for(ota_staged, timeout=10.0)
|
|
await asyncio.wait_for(rebooted, timeout=10.0)
|
|
await _wait_for_port(LOCALHOST, api_port, PORT_WAIT_TIMEOUT)
|
|
|
|
# execv preserves pid; mismatch means external respawn.
|
|
assert proc.returncode is None, "process exited instead of execing"
|
|
assert proc.pid == pid_before
|
|
|
|
async with wait_and_connect_api_client(port=api_port) as client:
|
|
info_after = await client.device_info()
|
|
assert info_after.name == DEVICE_NAME
|
|
assert info_after.name == info_before.name
|
|
|
|
# Second OTA: catches FD_CLOEXEC regressions (EADDRINUSE on rebind).
|
|
rc, _ = await loop.run_in_executor(
|
|
None, espota2.run_ota, LOCALHOST, ota_port, None, binary_path
|
|
)
|
|
assert rc == 0, "second OTA failed -- listener leaked across execv"
|
|
await _wait_for_port(LOCALHOST, api_port, PORT_WAIT_TIMEOUT)
|
|
assert proc.pid == pid_before
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_host_ota_rejects_garbage(
|
|
yaml_config: str,
|
|
write_yaml_config: ConfigWriter,
|
|
compile_esphome: CompileFunction,
|
|
reserved_tcp_port: tuple[int, socket.socket],
|
|
integration_test_dir,
|
|
) -> None:
|
|
"""Bogus payload is rejected and the device keeps running."""
|
|
api_port, api_socket = reserved_tcp_port
|
|
with _reserve_port() as (ota_port, ota_socket):
|
|
yaml_config = yaml_config.replace("__OTA_PORT__", str(ota_port))
|
|
config_path = await write_yaml_config(yaml_config)
|
|
binary_path = await compile_esphome(config_path)
|
|
|
|
# 192 bytes that are neither ELF nor Mach-O.
|
|
bogus_path = integration_test_dir / "bogus.bin"
|
|
bogus_path.write_bytes(b"NOT-AN-EXECUTABLE-AT-ALL" * 8)
|
|
|
|
api_socket.close()
|
|
ota_socket.close()
|
|
|
|
async with run_binary(binary_path) as (proc, _lines):
|
|
await _wait_for_port(LOCALHOST, api_port, PORT_WAIT_TIMEOUT)
|
|
pid_before = proc.pid
|
|
|
|
loop = asyncio.get_running_loop()
|
|
rc, _ = await loop.run_in_executor(
|
|
None, espota2.run_ota, LOCALHOST, ota_port, None, bogus_path
|
|
)
|
|
assert rc == 1
|
|
|
|
await asyncio.sleep(0.5)
|
|
assert proc.returncode is None, "process died on rejected OTA"
|
|
assert proc.pid == pid_before
|
|
|
|
async with wait_and_connect_api_client(port=api_port) as client:
|
|
info = await client.device_info()
|
|
assert info.name == DEVICE_NAME
|