mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 09:57:43 +00:00
2
Doxyfile
2
Doxyfile
@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
|
||||
# could be handy for archiving the generated documentation or if some version
|
||||
# control system is used.
|
||||
|
||||
PROJECT_NUMBER = 2026.6.0
|
||||
PROJECT_NUMBER = 2026.6.1
|
||||
|
||||
# Using the PROJECT_BRIEF tag one can provide an optional one line description
|
||||
# for a project that appears at the top of each page and should give viewer a
|
||||
|
||||
@@ -32,7 +32,7 @@ RUN \
|
||||
-r /requirements.txt
|
||||
|
||||
# Install the ESPHome Device Builder dashboard.
|
||||
RUN uv pip install --no-cache-dir esphome-device-builder==1.0.9
|
||||
RUN uv pip install --no-cache-dir esphome-device-builder==1.0.10
|
||||
|
||||
RUN \
|
||||
platformio settings set enable_telemetry No \
|
||||
|
||||
@@ -504,6 +504,12 @@ def has_resolvable_address() -> bool:
|
||||
if has_ip_address():
|
||||
return True
|
||||
|
||||
# The dashboard pre-resolves the device and passes the IPs via
|
||||
# --mdns-address-cache/--dns-address-cache; honor a cached address even when the
|
||||
# device has mDNS disabled (e.g. a .local host found via ping).
|
||||
if CORE.address_cache and CORE.address_cache.get_addresses(CORE.address):
|
||||
return True
|
||||
|
||||
if has_mdns():
|
||||
return True
|
||||
|
||||
@@ -1765,6 +1771,21 @@ def command_update_all(args: ArgsProtocol) -> int | None:
|
||||
def command_idedata(args: ArgsProtocol, config: ConfigType) -> int:
|
||||
import json
|
||||
|
||||
if CORE.using_toolchain_esp_idf:
|
||||
# Native ESP-IDF derives idedata from the build's compile_commands.json,
|
||||
# so the configuration must already be compiled.
|
||||
from esphome.espidf import toolchain as espidf_toolchain
|
||||
|
||||
idedata = espidf_toolchain.get_idedata()
|
||||
if idedata is None:
|
||||
_LOGGER.error(
|
||||
"No idedata available; compile the configuration first",
|
||||
)
|
||||
return 1
|
||||
|
||||
print(json.dumps(idedata, indent=2) + "\n")
|
||||
return 0
|
||||
|
||||
if not CORE.using_toolchain_platformio:
|
||||
_LOGGER.error(
|
||||
"The idedata command is not compatible with %s toolchain",
|
||||
|
||||
@@ -224,6 +224,17 @@ def merge_factory_bin(source, target, env):
|
||||
flash_size = env.BoardConfig().get("upload.flash_size", "4MB")
|
||||
chip = env.BoardConfig().get("build.mcu", "esp32")
|
||||
|
||||
# PlatformIO's esp-idf builder already creates a correct firmware.factory.bin (right
|
||||
# artifact names and partition offsets, including custom partition tables). The merge
|
||||
# below is only a fallback and cannot honor custom layouts, so don't overwrite an image
|
||||
# PlatformIO already produced. Post-build actions only run when firmware.bin is rebuilt,
|
||||
# and PlatformIO's combined-image builder runs before us in that batch, so an existing
|
||||
# file here is current.
|
||||
output_path = firmware_path.with_suffix(".factory.bin")
|
||||
if output_path.exists():
|
||||
print(f"{output_path.name} already created by PlatformIO - skipping merge")
|
||||
return
|
||||
|
||||
sections = []
|
||||
flasher_args_path = build_dir / "flasher_args.json"
|
||||
|
||||
@@ -291,7 +302,6 @@ def merge_factory_bin(source, target, env):
|
||||
print("No valid flash sections found — skipping .factory.bin creation.")
|
||||
return
|
||||
|
||||
output_path = firmware_path.with_suffix(".factory.bin")
|
||||
python_exe = f'"{env.subst("$PYTHONEXE")}"'
|
||||
cmd = [
|
||||
python_exe,
|
||||
|
||||
@@ -175,6 +175,10 @@ void Logger::process_messages_() {
|
||||
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
|
||||
// Process any buffered messages when available
|
||||
if (this->log_buffer_.has_messages()) {
|
||||
// Prevent main-task logs emitted by listener callbacks (e.g. the API send path) from re-entering
|
||||
// and corrupting the shared tx_buffer_ / API shared_write_buffer_ while we are draining here.
|
||||
// Mirrors the guard held by log_message_to_buffer_and_send_ on the synchronous logging path.
|
||||
RecursionGuard guard(this->main_task_recursion_guard_);
|
||||
logger::TaskLogBuffer::LogMessage *message;
|
||||
uint16_t text_length;
|
||||
while (this->log_buffer_.borrow_message_main_loop(message, text_length)) {
|
||||
|
||||
@@ -4,7 +4,7 @@ import esphome.config_validation as cv
|
||||
from esphome.const import (
|
||||
CONF_TIME_ID,
|
||||
DEVICE_CLASS_DURATION,
|
||||
DEVICE_CLASS_UPTIME,
|
||||
DEVICE_CLASS_TIMESTAMP,
|
||||
ENTITY_CATEGORY_DIAGNOSTIC,
|
||||
ICON_TIMER,
|
||||
STATE_CLASS_TOTAL_INCREASING,
|
||||
@@ -33,8 +33,9 @@ CONFIG_SCHEMA = cv.typed_schema(
|
||||
).extend(cv.polling_component_schema("60s")),
|
||||
"timestamp": sensor.sensor_schema(
|
||||
UptimeTimestampSensor,
|
||||
icon=ICON_TIMER,
|
||||
accuracy_decimals=0,
|
||||
device_class=DEVICE_CLASS_UPTIME,
|
||||
device_class=DEVICE_CLASS_TIMESTAMP,
|
||||
entity_category=ENTITY_CATEGORY_DIAGNOSTIC,
|
||||
)
|
||||
.extend(
|
||||
|
||||
@@ -4,7 +4,7 @@ from enum import Enum
|
||||
|
||||
from esphome.enum import StrEnum
|
||||
|
||||
__version__ = "2026.6.0"
|
||||
__version__ = "2026.6.1"
|
||||
|
||||
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
|
||||
VALID_SUBSTITUTIONS_CHARACTERS = (
|
||||
|
||||
@@ -472,6 +472,7 @@ def get_idedata() -> dict | None:
|
||||
pass
|
||||
|
||||
data = idedata_from_build(compile_commands)
|
||||
data["prog_path"] = str(get_elf_path())
|
||||
cache.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
|
||||
return data
|
||||
|
||||
@@ -70,12 +70,15 @@ def populate_dependency_config(
|
||||
|
||||
* ``domain.platform`` form (e.g. ``sensor.gpio``) appends
|
||||
``{platform: <name>}`` to ``config[domain]``, creating the list if needed.
|
||||
* Bare components are looked up via ``get_component_fn``. Platform
|
||||
components (``IS_PLATFORM_COMPONENT``) and ``MULTI_CONF`` components are
|
||||
initialised as ``[]`` so the sibling ``domain.platform`` branch can
|
||||
``append`` into them. Everything else is populated by running the
|
||||
component's schema with ``{}`` so defaults exist; if the schema requires
|
||||
explicit input, an empty ``{}`` is used as a fallback.
|
||||
* Bare components are looked up via ``get_component_fn``. Target-platform
|
||||
components (``is_target_platform``, e.g. ``esp32``) are skipped entirely:
|
||||
a host build targets ``host``, so a foreign target platform's sources are
|
||||
guarded out and its schema must not run here (it would mutate global CORE
|
||||
state as a side effect). Platform components (``IS_PLATFORM_COMPONENT``)
|
||||
and ``MULTI_CONF`` components are initialised as ``[]`` so the sibling
|
||||
``domain.platform`` branch can ``append`` into them. Everything else is
|
||||
populated by running the component's schema with ``{}`` so defaults exist;
|
||||
if the schema requires explicit input, an empty ``{}`` is used as a fallback.
|
||||
|
||||
Platform components must always be a list here even when no
|
||||
``domain.platform`` entry follows, because the ``domain.platform`` branch
|
||||
@@ -96,6 +99,12 @@ def populate_dependency_config(
|
||||
component = get_component_fn(component_name)
|
||||
if component is None:
|
||||
continue
|
||||
# Skip target platforms (e.g. esp32): a host build targets `host`, so a
|
||||
# foreign target's sources are guarded out, and running its schema with
|
||||
# {} leaks global CORE state (esp32 pins CORE.toolchain to ESP-IDF),
|
||||
# crashing the host compile. See #17035.
|
||||
if component.is_target_platform:
|
||||
continue
|
||||
if component.multi_conf or component.is_platform_component:
|
||||
config.setdefault(component_name, [])
|
||||
elif component_name not in config:
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
esphome:
|
||||
name: logger-recursion-test
|
||||
host:
|
||||
api:
|
||||
logger:
|
||||
level: DEBUG
|
||||
on_message:
|
||||
# Fires on the main loop for every message delivered to listeners, including
|
||||
# messages drained from the task log buffer (i.e. logged from a non-main thread).
|
||||
# The lambda logs again on the main task. Without a recursion guard on the buffered
|
||||
# drain path this re-entrant log reuses the shared tx_buffer_ and clobbers the
|
||||
# buffered message that is still being delivered, corrupting its console output.
|
||||
- level: VERY_VERBOSE
|
||||
then:
|
||||
- lambda: |-
|
||||
ESP_LOGD("reentry", "REENTRANT_CLOBBER_MARKER");
|
||||
|
||||
button:
|
||||
- platform: template
|
||||
name: "Start Race Test"
|
||||
id: start_test_button
|
||||
on_press:
|
||||
- lambda: |-
|
||||
// Keep the count well under the host task-log-buffer slot count so every
|
||||
// message goes through the ring buffer (buffered drain path) instead of the
|
||||
// emergency console fallback. The main loop is blocked in pthread_join while
|
||||
// the thread logs, so all messages are drained together once it returns.
|
||||
static const int NUM_MESSAGES = 30;
|
||||
|
||||
struct ThreadTest {
|
||||
static void *thread_func(void *arg) {
|
||||
char thread_name[16];
|
||||
snprintf(thread_name, sizeof(thread_name), "LogThread");
|
||||
#ifdef __APPLE__
|
||||
pthread_setname_np(thread_name);
|
||||
#else
|
||||
pthread_setname_np(pthread_self(), thread_name);
|
||||
#endif
|
||||
|
||||
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||
// Verifiable payload: data is a deterministic function of the message
|
||||
// index, so a clobbered buffer shows up as a missing or mismatched line.
|
||||
ESP_LOGD("thread_test", "THREADMSG%03d_DATA_%08X", i, i * 12345);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
// RACE_TEST_START / RACE_TEST_COMPLETE are logged from the main task (the
|
||||
// synchronous path, which already holds the recursion guard) so the test can
|
||||
// always detect completion even when the buffered path is corrupted.
|
||||
ESP_LOGI("thread_test", "RACE_TEST_START: logging %d messages from a thread", NUM_MESSAGES);
|
||||
|
||||
pthread_t thread;
|
||||
if (pthread_create(&thread, nullptr, ThreadTest::thread_func, nullptr) != 0) {
|
||||
ESP_LOGE("thread_test", "RACE_TEST_ERROR: Failed to create thread");
|
||||
return;
|
||||
}
|
||||
pthread_join(thread, nullptr);
|
||||
|
||||
ESP_LOGI("thread_test", "RACE_TEST_COMPLETE: thread finished, expected %d messages", NUM_MESSAGES);
|
||||
119
tests/integration/test_logger_buffered_recursion_guard.py
Normal file
119
tests/integration/test_logger_buffered_recursion_guard.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Integration test for the recursion guard on the buffered logger drain path.
|
||||
|
||||
Regression test for a crash where a log message drained from the task log buffer
|
||||
(i.e. logged from a non-main thread) re-entered the logger on the main task while it
|
||||
was still being delivered to listeners. The buffered drain in
|
||||
``Logger::process_messages_`` did not hold the main-task recursion guard that the
|
||||
synchronous logging path holds, so a listener callback that logged again on the main
|
||||
task (e.g. the API log-forwarding path, or a ``logger.on_message`` automation) reused
|
||||
the shared ``tx_buffer_`` and clobbered the message mid-delivery. On ESP32 this showed
|
||||
up as a ``StoreProhibited`` panic inside the API send path.
|
||||
|
||||
The fixture logs a small batch of verifiable messages from a non-main thread (kept
|
||||
under the host task-log-buffer slot count so they all take the buffered drain path
|
||||
rather than the emergency console fallback) while an ``on_message`` automation re-logs
|
||||
``REENTRANT_CLOBBER_MARKER`` on the main task for every delivered message.
|
||||
|
||||
Without the guard the re-entrant marker is written into the shared ``tx_buffer_`` while
|
||||
the buffered thread message is still being delivered, so the message the API receives is
|
||||
contaminated (it contains the marker and an embedded newline glued onto the thread
|
||||
payload). With the guard the re-entrant log is dropped during the drain, the marker
|
||||
never appears, and every thread message is delivered clean.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
from aioesphomeapi import LogLevel
|
||||
import pytest
|
||||
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
_ANSI = re.compile(r"\x1b\[[0-9;]*m")
|
||||
# THREADMSGnnn_DATA_xxxxxxxx where data is a deterministic checksum of the index
|
||||
THREAD_MSG_PATTERN = re.compile(r"THREADMSG(\d{3})_DATA_([0-9A-F]{8})")
|
||||
|
||||
NUM_MESSAGES = 30
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_logger_buffered_recursion_guard(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Buffered (non-main-thread) log messages survive a re-entrant main-task log."""
|
||||
api_messages: list[str] = []
|
||||
all_drained = asyncio.Event()
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
device_info = await client.device_info()
|
||||
assert device_info is not None
|
||||
assert device_info.name == "logger-recursion-test"
|
||||
|
||||
# Subscribe over the API: this is the exact path that crashed in the field
|
||||
# (the API log callback runs during the buffered drain). The API message field
|
||||
# preserves embedded newlines, so it reliably exposes a clobbered buffer.
|
||||
#
|
||||
# Every buffered thread message is delivered here whether it survives intact or
|
||||
# gets clobbered (a clobbered message still carries its THREADMSG payload), so
|
||||
# counting THREADMSG occurrences is a deterministic "drain complete" signal: no
|
||||
# arbitrary sleep, no dependence on the fix being present.
|
||||
def on_log(msg) -> None:
|
||||
text = msg.message.decode("utf-8", errors="replace")
|
||||
api_messages.append(text)
|
||||
received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages)
|
||||
if received >= NUM_MESSAGES:
|
||||
all_drained.set()
|
||||
|
||||
client.subscribe_logs(on_log, log_level=LogLevel.LOG_LEVEL_VERY_VERBOSE)
|
||||
|
||||
entities, _ = await client.list_entities_services()
|
||||
buttons = [e for e in entities if e.name == "Start Race Test"]
|
||||
assert buttons, "Could not find Start Race Test button"
|
||||
client.button_command(buttons[0].key)
|
||||
|
||||
# Wait until every buffered thread message has been delivered over the API.
|
||||
try:
|
||||
await asyncio.wait_for(all_drained.wait(), timeout=30.0)
|
||||
except TimeoutError:
|
||||
received = sum(len(THREAD_MSG_PATTERN.findall(m)) for m in api_messages)
|
||||
pytest.fail(
|
||||
f"Only {received}/{NUM_MESSAGES} thread messages arrived before timeout; "
|
||||
"device likely crashed or hung."
|
||||
)
|
||||
|
||||
intact: set[int] = set()
|
||||
contaminated: list[str] = []
|
||||
for raw in api_messages:
|
||||
text = _ANSI.sub("", raw)
|
||||
if "THREADMSG" not in text:
|
||||
continue
|
||||
# A clean thread message is a single line carrying only its own payload. A
|
||||
# clobbered buffer glues the re-entrant marker (and an embedded newline) onto it.
|
||||
if "REENTRANT" in text or "\n" in text:
|
||||
contaminated.append(repr(raw))
|
||||
continue
|
||||
match = THREAD_MSG_PATTERN.search(text)
|
||||
assert match, f"Unexpected thread message format: {raw!r}"
|
||||
msg_num = int(match.group(1))
|
||||
expected = f"{msg_num * 12345:08X}"
|
||||
if match.group(2) != expected:
|
||||
contaminated.append(repr(raw))
|
||||
continue
|
||||
intact.add(msg_num)
|
||||
|
||||
assert not contaminated, (
|
||||
"Buffered thread messages were clobbered by a re-entrant main-task log "
|
||||
"(missing recursion guard on the buffered drain path):\n"
|
||||
+ "\n".join(contaminated[:10])
|
||||
)
|
||||
assert len(intact) == NUM_MESSAGES, (
|
||||
f"Expected {NUM_MESSAGES} intact buffered thread messages over the API, got "
|
||||
f"{len(intact)}. Missing ids: {sorted(set(range(NUM_MESSAGES)) - intact)}"
|
||||
)
|
||||
76
tests/script/test_build_helpers.py
Normal file
76
tests/script/test_build_helpers.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""Unit tests for script/build_helpers.py."""
|
||||
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
# Add the script directory to the path so we can import build_helpers.
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "script"))
|
||||
|
||||
import build_helpers # noqa: E402
|
||||
|
||||
from esphome.core import CORE # noqa: E402
|
||||
|
||||
|
||||
class _FakeComponent:
|
||||
def __init__(self, config_schema, *, is_target_platform=False):
|
||||
self.multi_conf = False
|
||||
self.is_platform_component = False
|
||||
self.is_target_platform = is_target_platform
|
||||
self.config_schema = config_schema
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _restore_core_toolchain():
|
||||
"""Keep CORE.toolchain changes from leaking between tests."""
|
||||
saved = CORE.toolchain
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
CORE.toolchain = saved
|
||||
|
||||
|
||||
def test_populate_dependency_config_skips_target_platforms() -> None:
|
||||
"""Target-platform deps must be skipped, not config-populated, in a host build.
|
||||
|
||||
Regression test for #17035: esp32 (a target platform) appears only as a
|
||||
transitive dependency of a host C++ unit test. Running its schema with {}
|
||||
set ``CORE.toolchain = ESP_IDF`` as a side effect before failing validation,
|
||||
which crashed the host compile with KeyError('esp32'). The fix skips
|
||||
target-platform components entirely so their schema never runs.
|
||||
"""
|
||||
CORE.toolchain = None # the state a host build starts from
|
||||
schema_calls = []
|
||||
|
||||
def leaky_schema(value):
|
||||
# If this ever runs for a target platform, the bug is back.
|
||||
schema_calls.append(value)
|
||||
CORE.toolchain = "esp-idf-leak"
|
||||
raise ValueError("no board or variant")
|
||||
|
||||
config: dict = {}
|
||||
build_helpers.populate_dependency_config(
|
||||
config,
|
||||
["esp32"],
|
||||
get_component_fn=lambda name: _FakeComponent(
|
||||
leaky_schema, is_target_platform=True
|
||||
),
|
||||
register_platform_fn=lambda domain: None,
|
||||
)
|
||||
|
||||
assert "esp32" not in config # skipped: no synthesized entry
|
||||
assert schema_calls == [] # schema never run
|
||||
assert CORE.toolchain is None # no global side effect leaked
|
||||
|
||||
|
||||
def test_populate_dependency_config_populates_defaults() -> None:
|
||||
"""A non-target-platform dep still has its schema defaults harvested."""
|
||||
config: dict = {}
|
||||
build_helpers.populate_dependency_config(
|
||||
config,
|
||||
["ok"],
|
||||
get_component_fn=lambda name: _FakeComponent(lambda value: {"default": 1}),
|
||||
register_platform_fn=lambda domain: None,
|
||||
)
|
||||
assert config["ok"] == {"default": 1}
|
||||
@@ -266,11 +266,13 @@ def _make_component_stub(
|
||||
*,
|
||||
multi_conf: bool = False,
|
||||
is_platform_component: bool = False,
|
||||
is_target_platform: bool = False,
|
||||
config_schema=None,
|
||||
) -> MagicMock:
|
||||
stub = MagicMock()
|
||||
stub.multi_conf = multi_conf
|
||||
stub.is_platform_component = is_platform_component
|
||||
stub.is_target_platform = is_target_platform
|
||||
stub.config_schema = config_schema
|
||||
return stub
|
||||
|
||||
|
||||
@@ -89,8 +89,9 @@ def test_get_idedata_generates_and_caches(setup_core: Path) -> None:
|
||||
result = toolchain.get_idedata()
|
||||
|
||||
mock_transform.assert_called_once()
|
||||
assert result == {"cxx_path": "g++"}
|
||||
assert json.loads(cache.read_text()) == {"cxx_path": "g++"}
|
||||
prog_path = str(toolchain.get_elf_path())
|
||||
assert result == {"cxx_path": "g++", "prog_path": prog_path}
|
||||
assert json.loads(cache.read_text()) == {"cxx_path": "g++", "prog_path": prog_path}
|
||||
|
||||
|
||||
def test_get_idedata_uses_cache_when_valid(setup_core: Path) -> None:
|
||||
@@ -127,7 +128,7 @@ def test_get_idedata_regenerates_when_compile_commands_newer(setup_core: Path) -
|
||||
result = toolchain.get_idedata()
|
||||
|
||||
mock_transform.assert_called_once()
|
||||
assert result == {"cxx_path": "fresh"}
|
||||
assert result == {"cxx_path": "fresh", "prog_path": str(toolchain.get_elf_path())}
|
||||
|
||||
|
||||
def test_get_idedata_regenerates_on_corrupted_cache(setup_core: Path) -> None:
|
||||
@@ -147,7 +148,26 @@ def test_get_idedata_regenerates_on_corrupted_cache(setup_core: Path) -> None:
|
||||
result = toolchain.get_idedata()
|
||||
|
||||
mock_transform.assert_called_once()
|
||||
assert result == {"cxx_path": "regen"}
|
||||
assert result == {"cxx_path": "regen", "prog_path": str(toolchain.get_elf_path())}
|
||||
|
||||
|
||||
def test_get_idedata_prog_path_points_at_firmware_elf(setup_core: Path) -> None:
|
||||
"""The idedata exposes prog_path (the ELF) so consumers like build-action
|
||||
can locate firmware.factory.bin / firmware.ota.bin as its siblings."""
|
||||
compile_commands, _ = _setup_build(setup_core)
|
||||
compile_commands.parent.mkdir(parents=True, exist_ok=True)
|
||||
compile_commands.write_text("[]")
|
||||
|
||||
with patch(
|
||||
"esphome.espidf.idedata.idedata_from_build",
|
||||
return_value={"cxx_path": "g++"},
|
||||
):
|
||||
result = toolchain.get_idedata()
|
||||
|
||||
# Use Path semantics so the contract holds on Windows too (backslashes).
|
||||
prog_path = Path(result["prog_path"])
|
||||
assert prog_path.name == "firmware.elf"
|
||||
assert prog_path.parent.name == "build"
|
||||
|
||||
|
||||
def test_get_idf_env_sets_git_ceiling_directories(setup_core: Path) -> None:
|
||||
|
||||
@@ -32,6 +32,7 @@ from esphome.__main__ import (
|
||||
command_clean_all,
|
||||
command_config,
|
||||
command_config_hash,
|
||||
command_idedata,
|
||||
command_rename,
|
||||
command_run,
|
||||
command_update_all,
|
||||
@@ -689,6 +690,25 @@ def test_choose_upload_log_host_with_ota_device_with_ota_config() -> None:
|
||||
assert result == ["192.168.1.100"]
|
||||
|
||||
|
||||
def test_choose_upload_log_host_ota_mdns_disabled_uses_address_cache() -> None:
|
||||
"""A .local device with mDNS disabled resolves via the dashboard-supplied cache."""
|
||||
setup_core(
|
||||
config={
|
||||
CONF_API: {},
|
||||
CONF_OTA: [{CONF_PLATFORM: CONF_ESPHOME}],
|
||||
CONF_MDNS: {CONF_DISABLED: True},
|
||||
},
|
||||
address="esp32-a1s.local",
|
||||
)
|
||||
CORE.address_cache = AddressCache(mdns_cache={"esp32-a1s.local": ["192.168.1.50"]})
|
||||
|
||||
for purpose in (Purpose.LOGGING, Purpose.UPLOADING):
|
||||
result = choose_upload_log_host(
|
||||
default="OTA", check_default=None, purpose=purpose
|
||||
)
|
||||
assert result == ["192.168.1.50"]
|
||||
|
||||
|
||||
def test_choose_upload_log_host_with_ota_device_with_api_config() -> None:
|
||||
"""Test OTA device when API is configured (no upload without OTA in config)."""
|
||||
setup_core(config={CONF_API: {}}, address="192.168.1.100")
|
||||
@@ -3135,6 +3155,22 @@ def test_has_resolvable_address() -> None:
|
||||
setup_core(config={CONF_MDNS: {CONF_DISABLED: True}}, address=None)
|
||||
assert has_resolvable_address() is False
|
||||
|
||||
# mDNS disabled + .local, but the dashboard cached the address -> resolvable
|
||||
setup_core(
|
||||
config={CONF_MDNS: {CONF_DISABLED: True}}, address="esphome-device.local"
|
||||
)
|
||||
CORE.address_cache = AddressCache(
|
||||
mdns_cache={"esphome-device.local": ["192.168.1.100"]}
|
||||
)
|
||||
assert has_resolvable_address() is True
|
||||
|
||||
# mDNS disabled + .local, cache present but missing this host -> not resolvable
|
||||
setup_core(
|
||||
config={CONF_MDNS: {CONF_DISABLED: True}}, address="esphome-device.local"
|
||||
)
|
||||
CORE.address_cache = AddressCache(mdns_cache={"other-device.local": ["10.0.0.1"]})
|
||||
assert has_resolvable_address() is False
|
||||
|
||||
|
||||
def test_has_name_add_mac_suffix() -> None:
|
||||
"""Test has_name_add_mac_suffix function."""
|
||||
@@ -6222,3 +6258,28 @@ def test_command_run_defaults_subscribe_states_true(
|
||||
mock_run_logs.assert_called_once_with(
|
||||
CORE.config, ["192.168.1.100"], subscribe_states=True
|
||||
)
|
||||
|
||||
|
||||
def test_command_idedata_esp_idf_prints_json(capsys: CaptureFixture) -> None:
|
||||
"""Under the native ESP-IDF toolchain, idedata is emitted as JSON."""
|
||||
setup_core()
|
||||
CORE.toolchain = Toolchain.ESP_IDF
|
||||
data = {"cxx_path": "g++", "prog_path": "/build/firmware.elf"}
|
||||
|
||||
with patch("esphome.espidf.toolchain.get_idedata", return_value=data) as mock_get:
|
||||
result = command_idedata(MagicMock(), CORE.config)
|
||||
|
||||
assert result == 0
|
||||
mock_get.assert_called_once_with()
|
||||
assert json.loads(capsys.readouterr().out) == data
|
||||
|
||||
|
||||
def test_command_idedata_esp_idf_no_build_errors() -> None:
|
||||
"""Under ESP-IDF, a missing build (no idedata) returns an error, not a crash."""
|
||||
setup_core()
|
||||
CORE.toolchain = Toolchain.ESP_IDF
|
||||
|
||||
with patch("esphome.espidf.toolchain.get_idedata", return_value=None):
|
||||
result = command_idedata(MagicMock(), CORE.config)
|
||||
|
||||
assert result == 1
|
||||
|
||||
Reference in New Issue
Block a user