Compare commits

...

17 Commits

Author SHA1 Message Date
Jesse Hills
09dc41435c Merge pull request #16282 from esphome/bump-2026.4.5
2026.4.5
2026-05-07 08:12:15 +12:00
Jesse Hills
5283cdec12 Bump version to 2026.4.5 2026-05-07 07:25:35 +12:00
Edward Firmo
d9835c8705 [nextion] Fix text sensor state not updated on string response (#16280) 2026-05-07 07:25:35 +12:00
Mat931
b89c71c1ea [core] Fix WiFi connection in safe mode (#16269)
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
2026-05-07 07:25:35 +12:00
J. Nick Koston
7f6aef4f33 [substitutions] Fix sibling references inside dict-valued substitutions (#16273) 2026-05-07 07:25:35 +12:00
J. Nick Koston
016b509b55 [bundle] Include secrets.yaml when !secret keys are quoted (#16271) 2026-05-07 07:25:35 +12:00
Jesse Hills
d2bbaeccf3 [ha-addon] Add opt-in toggle for the new ESPHome Device Builder (#16247) 2026-05-07 07:25:35 +12:00
Jesse Hills
6fda5f41b2 Merge pull request #16240 from esphome/bump-2026.4.4
2026.4.4
2026-05-05 14:21:38 +12:00
Jesse Hills
197d4dac8e Bump version to 2026.4.4 2026-05-05 08:27:10 +12:00
Jesse Hills
2d7f9dc48d [api] Use safe_print for log output and fix safe_print bytes-repr fallback (#16160) 2026-05-05 08:27:04 +12:00
J. Nick Koston
be84e6c9f4 [api] Fall back to owning types for service array args used after a delay (#16140) 2026-05-05 08:22:05 +12:00
J. Nick Koston
0418f2138a [esp32] Drop printf wrap on IDF 6.0+ (picolibc no longer needs it) (#16189) 2026-05-05 08:22:05 +12:00
Clyde Stubbs
d9c22d6b56 [lvgl] Clamp values for meter line indicators (#16180) 2026-05-05 08:22:05 +12:00
J. Nick Koston
60a94fd109 [esp32] Replace 512B stack buffer in printf wraps with picolibc cookie FILE (#16170) 2026-05-05 08:22:05 +12:00
Jesse Hills
9371ec319a [core] Strip \\?\ prefix from sys.executable for PlatformIO subprocess (#16158) 2026-05-05 08:21:58 +12:00
J. Nick Koston
ce466c6b60 [mcp23xxx_base] Reject unsupported interrupt_pin options (inverted, allow_other_uses) (#16149) 2026-05-05 08:14:03 +12:00
Brandon Harvey
a460f5343c [automation] Fix codegen type for component.resume update_interval (#16069)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 08:14:02 +12:00
32 changed files with 848 additions and 138 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2026.4.3
PROJECT_NUMBER = 2026.4.5
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -0,0 +1,22 @@
#!/usr/bin/with-contenv bashio
# ==============================================================================
# Installs the latest prerelease of esphome-device-builder when the
# `use_new_device_builder` config option is enabled.
# This is a temporary install-on-boot step until esphome-device-builder
# becomes a direct dependency of esphome.
# ==============================================================================
if ! bashio::config.true 'use_new_device_builder'; then
exit 0
fi
bashio::log.info "Installing latest prerelease of esphome-device-builder..."
if command -v uv > /dev/null; then
uv pip install --system --no-cache-dir --prerelease=allow --upgrade \
esphome-device-builder ||
bashio::exit.nok "Failed installing esphome-device-builder."
else
pip install --no-cache-dir --pre --upgrade esphome-device-builder ||
bashio::exit.nok "Failed installing esphome-device-builder."
fi
bashio::log.info "Installed esphome-device-builder."

View File

@@ -49,5 +49,12 @@ if bashio::fs.directory_exists '/config/esphome/.esphome'; then
rm -rf /config/esphome/.esphome
fi
if bashio::config.true 'use_new_device_builder'; then
bashio::log.info "Starting ESPHome Device Builder..."
exec esphome-device-builder /config/esphome \
--ha-addon \
--ingress-port "$(bashio::addon.ingress_port)"
fi
bashio::log.info "Starting ESPHome dashboard..."
exec esphome dashboard /config/esphome --socket /var/run/esphome.sock --ha-addon

View File

@@ -4,6 +4,14 @@
# Community Hass.io Add-ons: ESPHome
# Configures NGINX for use with ESPHome
# ==============================================================================
# When the new device builder is enabled it serves HA ingress directly,
# so nginx is not used at all -- skip configuration.
if bashio::config.true 'use_new_device_builder'; then
bashio::log.info "Skipping NGINX setup: new device builder serves ingress directly."
bashio::exit.ok
fi
mkdir -p /var/log/nginx
# Generate Ingress configuration

View File

@@ -5,6 +5,14 @@
# Runs the NGINX proxy
# ==============================================================================
# The new device builder handles HA ingress itself, so nginx is bypassed.
# Block the longrun forever so s6 keeps the dependency satisfied and does
# not respawn it.
if bashio::config.true 'use_new_device_builder'; then
bashio::log.info "NGINX bypassed: new device builder serves ingress directly."
exec sleep infinity
fi
bashio::log.info "Waiting for ESPHome dashboard to come up..."
while [[ ! -S /var/run/esphome.sock ]]; do

View File

@@ -598,7 +598,7 @@ async def component_resume_action_to_code(
comp = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, comp)
if CONF_UPDATE_INTERVAL in config:
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int)
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, cg.uint32)
cg.add(var.set_update_interval(template_))
return var

View File

@@ -98,11 +98,13 @@ _KNOWN_FILE_EXTENSIONS = frozenset(
)
# Matches !secret references in YAML text. This is intentionally a simple
# regex scan rather than a YAML parse — it may match inside comments or
# multi-line strings, which is the conservative direction (include more
# secrets rather than fewer).
_SECRET_RE = re.compile(r"!secret\s+(\S+)")
# Matches !secret references in YAML text. An optional surrounding
# quote pair around the key is allowed and ignored: YAML treats
# ``!secret 'foo'`` and ``!secret foo`` as the same key. This is
# intentionally a simple regex scan rather than a YAML parse — it may
# match inside comments or multi-line strings, which is the conservative
# direction (include more secrets rather than fewer).
_SECRET_RE = re.compile(r"""!secret\s+['"]?([^\s'"]+)""")
def _find_used_secret_keys(yaml_files: list[Path]) -> set[str]:

View File

@@ -72,17 +72,35 @@ APIUnregisterServiceCallAction = api_ns.class_(
UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger)
ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument")
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Owning element type for each YAML service variable type. Used to derive both
# the zero-copy native types and the owning fallback types below.
_SERVICE_ARG_SCALAR_TYPES: dict[str, MockObj] = {
"bool": cg.bool_,
"int": cg.int32,
"float": cg.float_,
"string": cg.std_string,
}
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Scalars are passed by value; string uses a non-owning view into rx_buf_.
**_SERVICE_ARG_SCALAR_TYPES,
"string": cg.StringRef,
"bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"),
"int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"),
"float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"),
"string[]": cg.FixedVector.template(cg.std_string)
.operator("const")
.operator("ref"),
# Arrays are passed as non-owning const references into rx_buf_.
**{
f"{name}[]": cg.FixedVector.template(t).operator("const").operator("ref")
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
# Owning fallback types used when the action chain contains non-synchronous actions
# (delay, wait_until, script.wait, etc.). The default non-owning types reference
# storage in the receive buffer, which is reused once the synchronous portion of
# the chain returns. FixedVector is also non-copyable, so the deferred lambda
# capture in DelayAction::play_complex would fail to compile.
SERVICE_ARG_FALLBACK_TYPES: dict[str, MockObj] = {
"string": cg.std_string,
**{
f"{name}[]": cg.std_vector.template(t)
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
CONF_ENCRYPTION = "encryption"
CONF_BATCH_DELAY = "batch_delay"
@@ -382,17 +400,20 @@ async def to_code(config: ConfigType) -> None:
func_args.append((cg.bool_, "return_response"))
# Check if action chain has non-synchronous actions that would make
# non-owning StringRef dangle (rx_buf_ reused after delay)
# non-owning args (StringRef, const FixedVector&) dangle once the
# rx_buf_ is reused after a delay/wait_until/script.wait/etc. The
# FixedVector references would also fail to compile because they
# are non-copyable and DelayAction captures args by value.
has_non_synchronous = automation.has_non_synchronous_actions(
conf.get(CONF_THEN, [])
)
service_arg_names: list[str] = []
for name, var_ in conf[CONF_VARIABLES].items():
native = SERVICE_ARG_NATIVE_TYPES[var_]
# Fall back to std::string for string args if non-synchronous actions exist
if has_non_synchronous and native is cg.StringRef:
native = cg.std_string
if has_non_synchronous and var_ in SERVICE_ARG_FALLBACK_TYPES:
native = SERVICE_ARG_FALLBACK_TYPES[var_]
else:
native = SERVICE_ARG_NATIVE_TYPES[var_]
service_template_args.append(native)
func_args.append((native, name))
service_arg_names.append(name)

View File

@@ -20,6 +20,7 @@ import contextlib
from esphome.const import CONF_KEY, CONF_PORT, __version__
from esphome.core import CORE
from esphome.platformio_api import process_stacktrace
from esphome.util import safe_print
from . import CONF_ENCRYPTION
@@ -60,7 +61,6 @@ async def async_run_logs(
noise_psk=noise_psk,
addresses=addresses, # Pass all addresses for automatic retry
)
dashboard = CORE.dashboard
backtrace_state = False
# Try platform-specific stacktrace handler first, fall back to generic
@@ -82,7 +82,11 @@ async def async_run_logs(
f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
)
for parsed_msg in parse_log_message(text, timestamp):
print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg)
# safe_print handles the dashboard \033 escaping and falls back
# to backslashreplace encoding on stdouts that can't represent
# the wifi signal-bar block characters (Windows redirected
# cp1252 pipe).
safe_print(parsed_msg)
for raw_line in text.splitlines():
if platform_process_stacktrace:
backtrace_state = platform_process_stacktrace(

View File

@@ -1740,7 +1740,17 @@ async def to_code(config):
# Wrap FILE*-based printf functions to eliminate newlib's _vfprintf_r
# (~11 KB). See printf_stubs.cpp for implementation.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF]:
#
# The wrap is only beneficial against newlib. Picolibc's tinystdio
# implements vsnprintf by building a string-output FILE and calling
# vfprintf, so vfprintf is unconditionally linked in by any caller
# of snprintf/vsnprintf — effectively every build — and the wrap
# saves nothing while costing ~170 B of shim. IDF 5.x defaults to
# newlib on every variant; IDF 6.0+ switches to picolibc on every
# variant.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF] or idf_version() >= cv.Version(
6, 0, 0
):
cg.add_define("USE_FULL_PRINTF")
else:
for symbol in ("vprintf", "printf", "fprintf", "vfprintf"):

View File

@@ -1,32 +1,38 @@
/*
* Linker wrap stubs for FILE*-based printf functions.
* Linker wrap stubs for FILE*-based printf functions (newlib only).
*
* ESP-IDF SDK components (gpio driver, ringbuf, log_write) reference
* fprintf(), printf(), vprintf(), and vfprintf() which pull in the full
* printf implementation (~11 KB on newlib's _vfprintf_r, ~2.8 KB on
* picolibc's vfprintf). This is a separate implementation from the one
* used by snprintf/vsnprintf that handles FILE* stream I/O with buffering
* and locking.
* fprintf(), printf(), vprintf(), and vfprintf(), which on newlib pull
* in _vfprintf_r (~11 KB) — a separate implementation from the one used
* by snprintf/vsnprintf that handles FILE* stream I/O with buffering.
*
* ESPHome replaces the ESP-IDF log handler via esp_log_set_vprintf_(),
* so the SDK's vprintf() path is dead code at runtime. The fprintf()
* and printf() calls in SDK components are only in debug/assert paths
* (gpio_dump_io_configuration, ringbuf diagnostics) that are either
* GC'd or never called. Crash backtraces and panic output are
* unaffected they use esp_rom_printf() which is a ROM function
* and does not go through libc.
* unaffected; they use esp_rom_printf() which is a ROM function and
* does not go through libc.
*
* These stubs redirect through vsnprintf() (which uses _svfprintf_r
* already in the binary) and fwrite(), allowing the linker to
* dead-code eliminate _vfprintf_r.
* This wrap is newlib-only. On picolibc, vsnprintf is implemented as
* vfprintf into a string-output FILE, so vfprintf is unconditionally
* linked in by any caller of snprintf/vsnprintf and the wrap can never
* elide it — it just adds shim cost. Codegen forces USE_FULL_PRINTF
* on picolibc builds (IDF 6.0+ on all variants) so this file compiles
* to nothing there; the #error below catches a desynchronised gate.
*
* Saves ~11 KB of flash.
* Saves ~11 KB of flash on newlib.
*
* To disable these wraps, set enable_full_printf: true in the esp32
* advanced config section.
* To disable this wrap on newlib, set enable_full_printf: true in the
* esp32 advanced config section.
*/
#if defined(USE_ESP_IDF) && !defined(USE_FULL_PRINTF)
#ifdef __PICOLIBC__
#error "printf wrap is net-negative on picolibc; codegen should set USE_FULL_PRINTF"
#endif
#include <cstdarg>
#include <cstdio>
@@ -34,6 +40,9 @@
namespace esphome::esp32 {}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
static constexpr size_t PRINTF_BUFFER_SIZE = 512;
// These stubs are essentially dead code at runtime — ESPHome replaces the
@@ -55,14 +64,16 @@ static int write_printf_buffer(FILE *stream, char *buf, int len) {
return len;
}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
int __wrap_vprintf(const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stdout, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_printf(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
@@ -71,11 +82,6 @@ int __wrap_printf(const char *fmt, ...) {
return len;
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_fprintf(FILE *stream, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);

View File

@@ -454,10 +454,12 @@ void LVTouchListener::update(const touchscreen::TouchPoints_t &tpoints) {
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value) {
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value) {
auto *scale = lv_obj_get_parent(obj);
auto min_value = lv_scale_get_range_min_value(scale);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (lv_scale_get_range_max_value(scale) - min_value) +
auto max_value = lv_scale_get_range_max_value(scale);
value = clamp(value, min_value, max_value);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (max_value - min_value) +
lv_scale_get_rotation((scale))) %
360;
}

View File

@@ -112,7 +112,7 @@ inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images
#endif // USE_LVGL_ANIMIMG
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value);
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value);
#endif
// Parent class for things that wrap an LVGL object

View File

@@ -2,6 +2,7 @@ from esphome import pins
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_ALLOW_OTHER_USES,
CONF_ID,
CONF_INPUT,
CONF_INTERRUPT,
@@ -30,10 +31,29 @@ MCP23XXX_INTERRUPT_MODES = {
"FALLING": MCP23XXXInterruptMode.MCP23XXX_FALLING,
}
def _validate_interrupt_pin(value):
# The MCP component owns INT polarity (active-low, hardcoded falling-edge ISR)
# and installs a single ISR per GPIO, so neither inversion nor sharing is supported.
value = pins.internal_gpio_input_pin_schema(value)
if value.get(CONF_INVERTED):
raise cv.Invalid(
f"'{CONF_INVERTED}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"the MCP23xxx INT line is fixed active-low"
)
if value.get(CONF_ALLOW_OTHER_USES):
raise cv.Invalid(
f"'{CONF_ALLOW_OTHER_USES}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"sharing the interrupt pin between multiple MCP23xxx (or other components) "
"is not implemented. Remove the interrupt_pin to fall back to polling."
)
return value
MCP23XXX_CONFIG_SCHEMA = cv.Schema(
{
cv.Optional(CONF_OPEN_DRAIN_INTERRUPT, default=False): cv.boolean,
cv.Optional(CONF_INTERRUPT_PIN): pins.internal_gpio_input_pin_schema,
cv.Optional(CONF_INTERRUPT_PIN): _validate_interrupt_pin,
}
).extend(cv.COMPONENT_SCHEMA)

View File

@@ -641,6 +641,7 @@ void Nextion::process_nextion_commands_() {
} else {
ESP_LOGN(TAG, "String resp: '%s' id: %s type: %s", to_process.c_str(), component->get_variable_name().c_str(),
component->get_queue_type_string());
component->set_state_from_string(to_process, true, false);
}
delete nb; // NOLINT(cppcoreguidelines-owning-memory)

View File

@@ -297,7 +297,18 @@ def _push_context(
"""Resolve a variable, recursively resolving any dependencies it references."""
value = unresolved_vars.pop(key, Missing)
if value is Missing:
return Missing
# Either already resolved (in resolved_vars) or currently being
# resolved (self-reference from inside a dict-valued substitution).
# Returning what we have lets sibling references inside a dict
# value, e.g. ``${device.manufacturer}`` inside ``device.name``,
# see literal sibling values during their own resolution.
return resolved_vars.get(key, Missing)
if isinstance(value, dict):
# Dict-valued substitutions form a namespace; eagerly publish the
# original mapping so its members can reference each other while
# the dict's own substitution pass is still running. The entry is
# replaced with the fully-substituted dict once recursion returns.
resolved_vars[key] = value
try:
value = substitute(value, [], resolver_context, True)
except UndefinedError as err:

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2026.4.3"
__version__ = "2026.4.5"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -562,14 +562,9 @@ async def _add_controller_registry_define() -> None:
@coroutine_with_priority(CoroPriority.FINAL)
async def _add_looping_components() -> None:
# Emit a constexpr that computes the looping component count at C++ compile time
# and pre-init the FixedVector with the exact capacity. Uses std::is_same_v to
# detect loop() overrides. The constexpr goes in main.cpp's global section where
# all component types are in scope. calculate_looping_components_() then skips
# the counting pass and only does the two population passes.
# Emit ESPHOME_LOOPING_COMPONENT_COUNT. Sizing of looping_components_
# happens in core to_code() so it lands before safe_mode's early return.
entries = CORE.data.get("looping_component_entries", [])
if not entries:
return
# Build constexpr sum for the exact count, deduplicating by type
# Uses HasLoopOverride<T> which handles ambiguous &T::loop from multiple inheritance
@@ -577,7 +572,7 @@ async def _add_looping_components() -> None:
terms = [
f"({count} * HasLoopOverride<{cpp_type}>::value)"
for cpp_type, count in type_counts.items()
]
] or ["0"]
constexpr_expr = " + \\\n ".join(terms)
cg.add_global(
cg.RawStatement(
@@ -586,14 +581,6 @@ async def _add_looping_components() -> None:
)
)
# Pre-init FixedVector with exact capacity so calculate_looping_components_()
# can skip the counting pass
cg.add(
cg.RawExpression(
"App.looping_components_.init(ESPHOME_LOOPING_COMPONENT_COUNT)"
)
)
@coroutine_with_priority(CoroPriority.CORE)
async def to_code(config: ConfigType) -> None:
@@ -636,6 +623,14 @@ async def to_code(config: ConfigType) -> None:
# Define component count for static allocation
cg.add_define("ESPHOME_COMPONENT_COUNT", len(CORE.component_ids))
# Pre-init FixedVector with exact capacity so calculate_looping_components_()
# can skip the counting pass
cg.add(
cg.RawExpression(
"App.looping_components_.init(ESPHOME_LOOPING_COMPONENT_COUNT)"
)
)
CORE.add_job(_add_platform_defines)
CORE.add_job(_add_controller_registry_define)
CORE.add_job(_add_looping_components)

View File

@@ -53,6 +53,37 @@ FILTER_PLATFORMIO_LINES = [
]
def _strip_win_long_path_prefix(path: str) -> str:
r"""Strip the Windows extended-length path prefix from ``path``.
Handles both forms documented at
https://learn.microsoft.com/windows/win32/fileio/naming-a-file:
* ``\\?\C:\path\to\file`` -> ``C:\path\to\file``
* ``\\?\UNC\server\share\path`` -> ``\\server\share\path``
The NSIS-installed ``esphome.exe`` launcher on Windows starts Python with
``sys.executable`` already prefixed with ``\\?\``. That prefix propagates
into PlatformIO's ``$PYTHONEXE`` (PlatformIO reads ``PYTHONEXEPATH`` from
the environment, falling back to ``os.path.normpath(sys.executable)``)
and ends up baked into SCons-emitted command lines for build steps such
as the esp8266 ``elf2bin`` invocation. ``cmd.exe`` does not understand
the ``\\?\`` prefix, so the build fails with
"The system cannot find the path specified." Stripping the prefix early
keeps the path shell-quotable.
No-op on non-Windows platforms.
"""
if sys.platform != "win32":
return path
if path.startswith("\\\\?\\UNC\\"):
# \\?\UNC\server\share\... -> \\server\share\...
return "\\\\" + path[len("\\\\?\\UNC\\") :]
if path.startswith("\\\\?\\"):
return path[len("\\\\?\\") :]
return path
def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ["PLATFORMIO_FORCE_COLOR"] = "true"
os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute())
@@ -63,7 +94,18 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning")
# Increase uv retry count to handle transient network errors (default is 3)
os.environ.setdefault("UV_HTTP_RETRIES", "10")
cmd = [sys.executable, "-m", "esphome.platformio_runner"] + list(args)
# Strip the Windows extended-length path prefix from sys.executable so it
# doesn't propagate into PlatformIO's $PYTHONEXE and break SCons-emitted
# command lines run through cmd.exe.
python_exe = _strip_win_long_path_prefix(sys.executable)
if python_exe != sys.executable:
# Only override PYTHONEXEPATH when we actually stripped a prefix.
# PlatformIO's get_pythonexe_path() reads this and falls back to
# sys.executable otherwise; setting it unconditionally would clobber
# a user-provided value (or the unmodified path on platforms that
# don't need the strip).
os.environ["PYTHONEXEPATH"] = python_exe
cmd = [python_exe, "-m", "esphome.platformio_runner"] + list(args)
return run_external_process(*cmd, **kwargs)

View File

@@ -94,13 +94,29 @@ def safe_print(message="", end="\n"):
except UnicodeEncodeError:
pass
# Fall back to the stream's actual encoding (e.g. cp1252 on Windows
# redirected pipes). Use "backslashreplace" so unencodable code points
# like the wifi signal-bar block characters (U+2582..U+2588) become
# readable ``\uXXXX`` escapes, and decode back to ``str`` so ``print``
# never receives a ``bytes`` object (which would render as a ``b'...'``
# repr).
encoding = sys.stdout.encoding or "ascii"
try:
print(message.encode("utf-8", "backslashreplace"), end=end)
print(
message.encode(encoding, "backslashreplace").decode(encoding),
end=end,
)
return
except UnicodeEncodeError:
try:
print(message.encode("ascii", "backslashreplace"), end=end)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
pass
try:
print(
message.encode("ascii", "backslashreplace").decode("ascii"),
end=end,
)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
def safe_input(prompt=""):

View File

@@ -91,6 +91,24 @@ api:
- float_arr.size()
- string_arr[0].c_str()
- string_arr.size()
# Test array + string args used after a non-synchronous action (delay).
# The default non-owning types (StringRef, const FixedVector&) would
# dangle once rx_buf_ is reused, and FixedVector is non-copyable so
# DelayAction's lambda capture would fail to compile. The api codegen
# must fall back to owning std::string / std::vector here.
- action: array_with_delay
variables:
name: string
int_arr: int[]
string_arr: string[]
then:
- delay: 20ms
- logger.log:
format: "Delayed: %s (%u ints, %u strings)"
args:
- name.c_str()
- int_arr.size()
- string_arr.size()
# Test ContinuationAction (IfAction with then/else branches)
- action: test_if_action
variables:

View File

@@ -501,14 +501,15 @@ async def _read_stream_lines(
@asynccontextmanager
async def run_binary_and_wait_for_port(
async def run_binary(
binary_path: Path,
host: str,
port: int,
timeout: float = PORT_WAIT_TIMEOUT,
line_callback: Callable[[str], None] | None = None,
) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit."""
) -> AsyncGenerator[tuple[asyncio.subprocess.Process, list[str]]]:
"""Run a binary under a PTY, capture log output, and clean up on exit.
Yields the running ``Process`` and a live list of captured log lines.
No port wait -- callers that need that should use
``run_binary_and_wait_for_port``."""
# Create a pseudo-terminal to make the binary think it's running interactively
# This is needed because the ESPHome host logger checks isatty()
controller_fd, device_fd = pty.openpty()
@@ -535,7 +536,6 @@ async def run_binary_and_wait_for_port(
controller_transport, _ = await loop.connect_read_pipe(
lambda: controller_protocol, os.fdopen(controller_fd, "rb", 0)
)
output_reader = controller_reader
if process.returncode is not None:
raise RuntimeError(
@@ -543,27 +543,59 @@ async def run_binary_and_wait_for_port(
"Ensure the binary is valid and can run successfully."
)
# Wait for the API server to start listening
loop = asyncio.get_running_loop()
start_time = loop.time()
# Start collecting output
stdout_lines: list[str] = []
output_tasks: list[asyncio.Task] = []
output_task = asyncio.create_task(
_read_stream_lines(controller_reader, stdout_lines, sys.stdout, line_callback)
)
try:
# Read from output stream
output_tasks = [
asyncio.create_task(
_read_stream_lines(
output_reader, stdout_lines, sys.stdout, line_callback
)
)
]
# Small yield to ensure the process has a chance to start
await asyncio.sleep(0)
yield process, stdout_lines
finally:
output_task.cancel()
result = await asyncio.gather(output_task, return_exceptions=True)
if isinstance(result[0], Exception) and not isinstance(
result[0], asyncio.CancelledError
):
print(f"Error reading from PTY: {result[0]}", file=sys.stderr)
# Close the PTY transport (Unix only)
if controller_transport is not None:
controller_transport.close()
# Cleanup: terminate the process gracefully
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def run_binary_and_wait_for_port(
binary_path: Path,
host: str,
port: int,
timeout: float = PORT_WAIT_TIMEOUT,
line_callback: Callable[[str], None] | None = None,
) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit."""
async with run_binary(binary_path, line_callback=line_callback) as (
process,
stdout_lines,
):
loop = asyncio.get_running_loop()
start_time = loop.time()
while loop.time() - start_time < timeout:
try:
# Try to connect to the port
@@ -593,41 +625,6 @@ async def run_binary_and_wait_for_port(
raise TimeoutError(error_msg)
finally:
# Cancel output collection tasks
for task in output_tasks:
task.cancel()
# Wait for tasks to complete and check for exceptions
results = await asyncio.gather(*output_tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(
result, asyncio.CancelledError
):
print(
f"Error reading from PTY: {result}",
file=sys.stderr,
)
# Close the PTY transport (Unix only)
if controller_transport is not None:
controller_transport.close()
# Cleanup: terminate the process gracefully
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def run_compiled_context(

View File

@@ -0,0 +1,25 @@
esphome:
name: safe-mode-loop-runs
host:
logger:
safe_mode:
num_attempts: 10
on_safe_mode:
- lambda: |-
// Spawn a detached thread that logs a unique marker. The
// non-main-thread log goes through the task log buffer, which
// is only drained by Logger::loop(). If looping components
// weren't initialized (the bug fixed in #16269), the buffer is
// never read and the marker never reaches the console.
struct MarkerThread {
static void *thread_func(void *) {
ESP_LOGI("safe_mode_test", "looping component ran in safe mode");
return nullptr;
}
};
pthread_t t;
pthread_create(&t, nullptr, MarkerThread::thread_func, nullptr);
pthread_detach(t);

View File

@@ -0,0 +1,39 @@
"""Helpers for manipulating the host platform's preferences file.
ESPHome's host platform stores preferences in
``~/.esphome/prefs/<app_name>.prefs`` using a simple binary layout that
mirrors ``HostPreferences::sync()``:
``[uint32_t key][uint8_t len][uint8_t data[len]]`` per entry.
Tests use these helpers to pre-populate state the binary will see at
boot (e.g. forcing safe mode) or to clear stale state between runs.
"""
from __future__ import annotations
from pathlib import Path
import struct
def host_prefs_path(device_name: str) -> Path:
"""Return the on-disk prefs file path for a host-platform device."""
return Path.home() / ".esphome" / "prefs" / f"{device_name}.prefs"
def clear_host_prefs(device_name: str) -> None:
"""Delete the prefs file for a host-platform device, if it exists."""
host_prefs_path(device_name).unlink(missing_ok=True)
def write_host_pref(device_name: str, key: int, data: bytes) -> Path:
"""Write a single preference entry, replacing the file's contents.
Returns the path that was written.
"""
if len(data) > 255:
raise ValueError(f"Preference data too long: {len(data)} bytes (max 255)")
path = host_prefs_path(device_name)
path.parent.mkdir(parents=True, exist_ok=True)
payload = struct.pack("<IB", key, len(data)) + data
path.write_bytes(payload)
return path

View File

@@ -0,0 +1,94 @@
"""Regression test for safe_mode + looping_components init ordering.
Reproduces the bug fixed in https://github.com/esphome/esphome/pull/16269:
``App.looping_components_.init(...)`` was emitted at ``CoroPriority.FINAL``,
which placed it *after* the ``safe_mode`` early-return in ``setup_app()``.
When safe mode was entered, the ``FixedVector`` backing the looping-component
list was never sized, ``looping_components_active_end_`` stayed at 0, and
``loop()`` iterated zero components -- so any looping component above
``CoroPriority.APPLICATION`` (e.g. wifi, logger) never ran.
The test forces safe mode by writing ``ENTER_SAFE_MODE_MAGIC`` to the host
preferences file before booting, then asserts that ``Logger::loop()`` runs
by logging from a non-main thread. Non-main-thread logs are buffered in
``TaskLogBuffer`` and only emitted to the console when ``Logger::loop()``
drains the buffer. Without the fix, the marker stays in the buffer
forever; with the fix, it reaches the console.
The API server (``CoroPriority.WEB``, 40) is registered below safe_mode
(``CoroPriority.APPLICATION``, 50), so it's never set up when safe mode
is active and ``run_compiled`` would hang waiting for the API port.
This test uses ``run_binary`` directly to skip the port wait.
"""
from __future__ import annotations
import asyncio
import re
import struct
import pytest
from .conftest import run_binary
from .host_prefs import clear_host_prefs, write_host_pref
from .types import CompileFunction, ConfigWriter
# Must match esphome::safe_mode::RTC_KEY in safe_mode.h
SAFE_MODE_RTC_KEY = 233825507
# Must match esphome::safe_mode::SafeModeComponent::ENTER_SAFE_MODE_MAGIC
ENTER_SAFE_MODE_MAGIC = 0x5AFE5AFE
DEVICE_NAME = "safe-mode-loop-runs"
THREAD_LOG_MARKER = "looping component ran in safe mode"
@pytest.mark.asyncio
async def test_safe_mode_loop_runs(
yaml_config: str,
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
) -> None:
"""When safe mode is active, ``App.loop()`` must still iterate looping
components -- proven here by a thread-logged marker reaching the
console (which requires ``Logger::loop()`` to run)."""
config_path = await write_yaml_config(yaml_config)
binary_path = await compile_esphome(config_path)
# Compile finished successfully; pre-populate prefs so the *next* run
# enters safe mode immediately.
write_host_pref(
DEVICE_NAME, SAFE_MODE_RTC_KEY, struct.pack("<I", ENTER_SAFE_MODE_MAGIC)
)
try:
loop = asyncio.get_running_loop()
safe_mode_active = loop.create_future()
thread_log_seen = loop.create_future()
safe_mode_pattern = re.compile(r"SAFE MODE IS ACTIVE")
thread_log_pattern = re.compile(re.escape(THREAD_LOG_MARKER))
def on_log(line: str) -> None:
if not safe_mode_active.done() and safe_mode_pattern.search(line):
safe_mode_active.set_result(True)
if not thread_log_seen.done() and thread_log_pattern.search(line):
thread_log_seen.set_result(True)
async with run_binary(binary_path, line_callback=on_log):
try:
await asyncio.wait_for(safe_mode_active, timeout=15.0)
except TimeoutError:
pytest.fail(
"Did not observe 'SAFE MODE IS ACTIVE' -- safe mode "
"didn't trigger, so this test isn't exercising the bug."
)
try:
await asyncio.wait_for(thread_log_seen, timeout=10.0)
except TimeoutError:
pytest.fail(
f"Did not observe thread-logged marker {THREAD_LOG_MARKER!r} "
"within timeout. Logger::loop() never drained the task "
"log buffer, meaning App.looping_components_ was never "
"sized -- this is the regression #16269 fixed."
)
finally:
clear_host_prefs(DEVICE_NAME)

View File

@@ -9,7 +9,6 @@ Tests that:
from __future__ import annotations
import asyncio
from pathlib import Path
import socket
from typing import Any
@@ -17,9 +16,12 @@ from aioesphomeapi import TextInfo, TextState
import pytest
from .conftest import run_binary_and_wait_for_port, wait_and_connect_api_client
from .host_prefs import clear_host_prefs
from .state_utils import InitialStateHelper, require_entity
from .types import CompileFunction, ConfigWriter
DEVICE_NAME = "host-template-text-save-test"
@pytest.mark.asyncio
async def test_template_text_save(
@@ -32,11 +34,7 @@ async def test_template_text_save(
port, port_socket = reserved_tcp_port
# Clean up any stale preference file from previous runs
prefs_file = (
Path.home() / ".esphome" / "prefs" / "host-template-text-save-test.prefs"
)
if prefs_file.exists():
prefs_file.unlink()
clear_host_prefs(DEVICE_NAME)
# Write and compile once
config_path = await write_yaml_config(yaml_config)
@@ -59,7 +57,7 @@ async def test_template_text_save(
wait_and_connect_api_client(port=port) as client,
):
device_info = await client.device_info()
assert device_info.name == "host-template-text-save-test"
assert device_info.name == DEVICE_NAME
entities, _ = await client.list_entities_services()
text_entity = require_entity(
@@ -127,5 +125,4 @@ async def test_template_text_save(
)
# Clean up preference file
if prefs_file.exists():
prefs_file.unlink()
clear_host_prefs(DEVICE_NAME)

View File

@@ -10,6 +10,7 @@ from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import config_validation as cv, core
from esphome.components.safe_mode import to_code as safe_mode_to_code
from esphome.const import (
CONF_AREA,
CONF_AREAS,
@@ -312,6 +313,75 @@ def test_add_platform_defines_priority() -> None:
)
def test_to_code_priority_above_safe_mode() -> None:
"""Test that core to_code emits the looping_components_ init before safe_mode.
Regression test for https://github.com/esphome/esphome/issues/16262.
safe_mode emits an `if (should_enter_safe_mode(...)) return;` line in main()
at APPLICATION priority. The `App.looping_components_.init(...)` call must be
emitted at a higher priority than APPLICATION so it lands in main() before
the early return; otherwise the FixedVector is never sized when safe mode is
active and loop() never runs (Wi-Fi never connects).
"""
assert config.to_code.priority > safe_mode_to_code.priority, (
f"core to_code priority ({config.to_code.priority}) must be greater than "
f"safe_mode to_code priority ({safe_mode_to_code.priority}) so that "
"App.looping_components_.init() is emitted before safe_mode's early return"
)
@pytest.mark.asyncio
async def test_add_looping_components_handles_empty_entries() -> None:
"""Test that _add_looping_components emits a valid constexpr when there are
no looping component entries.
With zero entries the generated constexpr must still be syntactically valid
C++ (`= 0;`), not an empty expression (`= ;`). This guards the empty-list
case that would otherwise produce uncompilable main.cpp output.
"""
CORE.data["looping_component_entries"] = []
await config._add_looping_components()
constexpr_lines = [
str(s)
for s in CORE.global_statements
if "ESPHOME_LOOPING_COMPONENT_COUNT" in str(s)
]
assert len(constexpr_lines) == 1
text = constexpr_lines[0]
assert "static constexpr size_t ESPHOME_LOOPING_COMPONENT_COUNT" in text
# The right-hand side must contain a literal `0`, not be empty.
rhs = text.split("=", 1)[1]
assert "0" in rhs
assert rhs.strip().rstrip(";").strip(), (
f"constexpr right-hand side must not be empty, got: {text!r}"
)
@pytest.mark.asyncio
async def test_add_looping_components_with_entries() -> None:
"""Test that _add_looping_components builds a HasLoopOverride sum from entries."""
CORE.data["looping_component_entries"] = [
"esphome::wifi::WiFiComponent",
"esphome::logger::Logger",
"esphome::wifi::WiFiComponent",
]
await config._add_looping_components()
constexpr_lines = [
str(s)
for s in CORE.global_statements
if "ESPHOME_LOOPING_COMPONENT_COUNT" in str(s)
]
assert len(constexpr_lines) == 1
text = constexpr_lines[0]
# Deduplicated by type, with per-type counts as multiplier.
assert "(2 * HasLoopOverride<esphome::wifi::WiFiComponent>::value)" in text
assert "(1 * HasLoopOverride<esphome::logger::Logger>::value)" in text
def test_valid_include_with_angle_brackets() -> None:
"""Test valid_include accepts angle bracket includes."""
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"

View File

@@ -0,0 +1,16 @@
substitutions:
device:
manufacturer: espressif
model: esp32
mac_suffix: ffffff
name: espressif-esp32-ffffff
network:
host: example.com
port: 8080
url: http://example.com:8080/api
esphome:
name: espressif-esp32-ffffff
test_list:
- espressif-esp32-ffffff
- http://example.com:8080/api
- espressif/esp32

View File

@@ -0,0 +1,18 @@
substitutions:
device:
manufacturer: "espressif"
model: "esp32"
mac_suffix: "ffffff"
name: ${device.manufacturer}-${device.model}-${device.mac_suffix}
network:
host: "example.com"
port: 8080
url: "http://${network.host}:${network.port}/api"
esphome:
name: ${device.name}
test_list:
- ${device.name}
- ${network.url}
- "${device.manufacturer}/${device.model}"

View File

@@ -170,6 +170,23 @@ def test_find_used_secret_keys_deduplicates(tmp_path: Path) -> None:
assert keys == {"key1"}
def test_find_used_secret_keys_quoted(tmp_path: Path) -> None:
"""Quoted !secret keys should resolve to the same key as unquoted form.
YAML strips surrounding quotes during parsing, so the secrets.yaml
lookup uses the unquoted key. The bundle scan must do the same.
"""
yaml1 = tmp_path / "a.yaml"
yaml1.write_text(
"single: !secret 'wifi_ssid'\n"
'double: !secret "wifi_pw"\n'
"bare: !secret api_key\n"
)
keys = _find_used_secret_keys([yaml1])
assert keys == {"wifi_ssid", "wifi_pw", "api_key"}
# ---------------------------------------------------------------------------
# _add_bytes_to_tar
# ---------------------------------------------------------------------------
@@ -1217,6 +1234,35 @@ def test_create_bundle_filters_secrets(tmp_path: Path) -> None:
assert "should_not_appear" not in secrets_data
def test_create_bundle_filters_secrets_quoted(tmp_path: Path) -> None:
"""Bundling must include secrets.yaml when !secret keys are quoted.
Regression test for issue 16259: quoted !secret references previously
captured the quotes as part of the key, so no key matched secrets.yaml
entries and the secrets file was dropped from the bundle entirely.
"""
config_dir = _setup_config_dir(tmp_path)
secrets = config_dir / "secrets.yaml"
secrets.write_text("ota_password: hunter2\nunused: should_not_appear\n")
config_yaml = "ota:\n password: !secret 'ota_password'\n"
(config_dir / "test.yaml").write_text(config_yaml)
creator = ConfigBundleCreator({})
result = creator.create_bundle()
assert result.manifest[ManifestKey.HAS_SECRETS] is True
buf = io.BytesIO(result.data)
with tarfile.open(fileobj=buf, mode="r:gz") as tar:
secrets_data = tar.extractfile("secrets.yaml").read().decode()
assert "ota_password" in secrets_data
assert "hunter2" in secrets_data
assert "unused" not in secrets_data
def test_create_bundle_no_secrets(tmp_path: Path) -> None:
_setup_config_dir(tmp_path)

View File

@@ -311,6 +311,105 @@ def test_run_platformio_cli_sets_environment_variables(
assert "arg" in args
@pytest.mark.parametrize(
("platform", "input_path", "expected"),
[
# win32: drive-letter extended-length prefix is stripped
(
"win32",
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# win32: UNC extended-length prefix is translated to a regular UNC path
(
"win32",
"\\\\?\\UNC\\server\\share\\python.exe",
"\\\\server\\share\\python.exe",
),
# win32: paths without the prefix are returned unchanged
(
"win32",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# non-win32: prefix is left alone (no-op)
("linux", "\\\\?\\C:\\python.exe", "\\\\?\\C:\\python.exe"),
("darwin", "/usr/bin/python3", "/usr/bin/python3"),
],
)
def test_strip_win_long_path_prefix(
platform: str, input_path: str, expected: str
) -> None:
r"""``\\?\`` and ``\\?\UNC\`` prefixes are stripped only on win32."""
with patch("esphome.platformio_api.sys.platform", platform):
assert platformio_api._strip_win_long_path_prefix(input_path) == expected
def test_run_platformio_cli_strips_win_long_path_prefix(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""Windows ``\\?\`` prefix on sys.executable does not leak into the subprocess.
The NSIS-installed esphome.exe launcher starts Python with
``sys.executable`` already prefixed by the extended-length path marker.
That prefix would otherwise propagate into PlatformIO's ``PYTHONEXE`` and
break SCons-emitted command lines run through ``cmd.exe``.
"""
CORE.build_path = str(setup_core / "build" / "test")
prefixed_exe = (
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
stripped_exe = (
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "win32"),
patch("esphome.platformio_api.sys.executable", prefixed_exe),
):
# Pop any pre-existing PYTHONEXEPATH so the assertion below reflects
# what run_platformio_cli set, not whatever the test runner's
# environment happened to contain.
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# The subprocess is invoked with the stripped executable path.
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == stripped_exe
# PYTHONEXEPATH is exported with the stripped path so PlatformIO's
# get_pythonexe_path() picks it up in the subprocess.
assert os.environ["PYTHONEXEPATH"] == stripped_exe
def test_run_platformio_cli_does_not_set_pythonexepath_without_strip(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""PYTHONEXEPATH is not touched when sys.executable has no ``\\?\`` prefix.
Setting it unconditionally would clobber a user-provided value (or
interfere with non-Windows tooling that has no prefix to strip).
"""
CORE.build_path = str(setup_core / "build" / "test")
plain_exe = "/usr/bin/python3"
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "linux"),
patch("esphome.platformio_api.sys.executable", plain_exe),
):
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == plain_exe
assert "PYTHONEXEPATH" not in os.environ
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:

View File

@@ -709,3 +709,119 @@ def test_detect_rp2040_bootsel_timeout() -> None:
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False
class TestSafePrint:
"""Tests for ``safe_print`` and its UnicodeEncodeError fallback chain."""
@pytest.fixture(autouse=True)
def _no_dashboard(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Default ``CORE.dashboard`` to False so each test starts hermetic."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", False)
def test_prints_plain_message(self, capsys: pytest.CaptureFixture[str]) -> None:
"""ASCII-only messages take the fast path through native ``print``."""
util.safe_print("hello world")
assert capsys.readouterr().out == "hello world\n"
def test_prints_unicode_on_utf8_stdout(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""Non-ASCII goes straight through when stdout can encode it."""
util.safe_print("bars: \u2582\u2584\u2586\u2588")
assert capsys.readouterr().out == "bars: \u2582\u2584\u2586\u2588\n"
def test_dashboard_escapes_esc_byte(
self,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
r"""Dashboard mode escapes raw ``\033`` ESC bytes to literal ``\\033``."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
util.safe_print("\033[0;32mhi\033[0m")
assert capsys.readouterr().out == "\\033[0;32mhi\\033[0m\n"
def test_fallback_writes_string_not_bytes_repr(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Regression: cp1252 fallback must produce a printable str, not ``b'...'``.
On Windows, when stdout is a redirected pipe (e.g. the dashboard),
Python uses cp1252, which cannot encode the wifi signal-bar block
characters (U+2582..U+2588). The previous fallback path called
``print(message.encode(...))`` with a ``bytes`` object, which
Python's ``print`` rendered as a literal ``b'...'`` repr — visible
in the user's dashboard output. The fix re-encodes through the
stream's encoding with ``backslashreplace`` and decodes back to
``str``.
"""
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("bars: \u2582\u2584\u2586\u2588 done")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Output is a clean line, not the bytes repr.
assert not output.startswith("b'")
assert "b'bars" not in output
# Unencodable codepoints become readable backslash escapes.
assert "\\u2582\\u2584\\u2586\\u2588" in output
# Encodable parts survive unchanged.
assert "bars: " in output
assert " done" in output
assert output.endswith("\n")
def test_fallback_with_dashboard_escaped_message(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Dashboard ESC escaping + cp1252 fallback compose correctly."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("\033[0;32m\u2582\u2584\u2586\u2588\033[0m")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Dashboard escaping turned ESC into literal "\033" (5 chars), which
# cp1252 can encode, so it survives the round-trip verbatim.
assert "\\033[0;32m" in output
assert "\\033[0m" in output
# Block characters became backslash escapes via backslashreplace.
assert "\\u2582\\u2584\\u2586\\u2588" in output
def test_final_message_when_locale_is_invalid(
self,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
"""If every encoding path fails, surface the locale-error sentinel."""
original_print = print
call_count = 0
def fake_print(*args: Any, **kwargs: Any) -> None:
nonlocal call_count
call_count += 1
# The first three calls are: native print, stream-encoding
# fallback, ASCII fallback. Make all three raise so we reach
# the final sentinel "Cannot print line..." which is expected
# to succeed (no encoding required).
if call_count <= 3:
raise UnicodeEncodeError("ascii", "x", 0, 1, "boom")
original_print(*args, **kwargs)
monkeypatch.setattr("builtins.print", fake_print)
util.safe_print("x")
assert call_count == 4
assert (
capsys.readouterr().out == "Cannot print line because of invalid locale!\n"
)