Compare commits

...

28 Commits

Author SHA1 Message Date
Jesse Hills
09dc41435c Merge pull request #16282 from esphome/bump-2026.4.5
2026.4.5
2026-05-07 08:12:15 +12:00
Jesse Hills
5283cdec12 Bump version to 2026.4.5 2026-05-07 07:25:35 +12:00
Edward Firmo
d9835c8705 [nextion] Fix text sensor state not updated on string response (#16280) 2026-05-07 07:25:35 +12:00
Mat931
b89c71c1ea [core] Fix WiFi connection in safe mode (#16269)
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
2026-05-07 07:25:35 +12:00
J. Nick Koston
7f6aef4f33 [substitutions] Fix sibling references inside dict-valued substitutions (#16273) 2026-05-07 07:25:35 +12:00
J. Nick Koston
016b509b55 [bundle] Include secrets.yaml when !secret keys are quoted (#16271) 2026-05-07 07:25:35 +12:00
Jesse Hills
d2bbaeccf3 [ha-addon] Add opt-in toggle for the new ESPHome Device Builder (#16247) 2026-05-07 07:25:35 +12:00
Jesse Hills
6fda5f41b2 Merge pull request #16240 from esphome/bump-2026.4.4
2026.4.4
2026-05-05 14:21:38 +12:00
Jesse Hills
197d4dac8e Bump version to 2026.4.4 2026-05-05 08:27:10 +12:00
Jesse Hills
2d7f9dc48d [api] Use safe_print for log output and fix safe_print bytes-repr fallback (#16160) 2026-05-05 08:27:04 +12:00
J. Nick Koston
be84e6c9f4 [api] Fall back to owning types for service array args used after a delay (#16140) 2026-05-05 08:22:05 +12:00
J. Nick Koston
0418f2138a [esp32] Drop printf wrap on IDF 6.0+ (picolibc no longer needs it) (#16189) 2026-05-05 08:22:05 +12:00
Clyde Stubbs
d9c22d6b56 [lvgl] Clamp values for meter line indicators (#16180) 2026-05-05 08:22:05 +12:00
J. Nick Koston
60a94fd109 [esp32] Replace 512B stack buffer in printf wraps with picolibc cookie FILE (#16170) 2026-05-05 08:22:05 +12:00
Jesse Hills
9371ec319a [core] Strip \\?\ prefix from sys.executable for PlatformIO subprocess (#16158) 2026-05-05 08:21:58 +12:00
J. Nick Koston
ce466c6b60 [mcp23xxx_base] Reject unsupported interrupt_pin options (inverted, allow_other_uses) (#16149) 2026-05-05 08:14:03 +12:00
Brandon Harvey
a460f5343c [automation] Fix codegen type for component.resume update_interval (#16069)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 08:14:02 +12:00
Jesse Hills
4e0509435a Merge pull request #16067 from esphome/bump-2026.4.3
2026.4.3
2026-04-28 15:39:24 +12:00
Jesse Hills
95b5ab7e78 Bump version to 2026.4.3 2026-04-28 12:58:29 +12:00
J. Nick Koston
3ac0939f55 [image] Fix RGB565+alpha rendering for multi-frame animations (#16017)
Co-authored-by: Claude <noreply@anthropic.com>
2026-04-28 12:58:29 +12:00
Jesse Hills
191d3bc7e4 [esp32_touch] Feed wdt (#16066) 2026-04-28 12:58:29 +12:00
Edward Firmo
a186f6fea9 [nextion] Unify TFT upload ack timeout to 5000ms (#15960) 2026-04-28 12:58:29 +12:00
Mat931
aea88aef5e [esp32][wifi] Fix bootloop and WiFi connection issue if nvs partition is missing or has non-default label (#16025)
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
2026-04-28 12:58:29 +12:00
J. Nick Koston
433bbdb016 [rotary_encoder][at581x] Fix templatable int field types (#16015) 2026-04-28 12:58:29 +12:00
J. Nick Koston
4137d93cbf [wifi] Fix stale wifi.connected after state transition (#15966) 2026-04-28 12:58:29 +12:00
J. Nick Koston
6a5919ee87 [deep_sleep] Fix sleep_duration codegen type to uint32_t (#15965) 2026-04-28 12:58:29 +12:00
Jesse Hills
b753ee4e94 [time] Handle Windows EINVAL when validating POSIX TZ strings (#15934) 2026-04-28 12:58:29 +12:00
Clyde Stubbs
c26ea52620 [lvgl] Triggers on tabview tabs fix (#15935) 2026-04-28 12:58:29 +12:00
51 changed files with 1075 additions and 162 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2026.4.2
PROJECT_NUMBER = 2026.4.5
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -0,0 +1,22 @@
#!/usr/bin/with-contenv bashio
# ==============================================================================
# Installs the latest prerelease of esphome-device-builder when the
# `use_new_device_builder` config option is enabled.
# This is a temporary install-on-boot step until esphome-device-builder
# becomes a direct dependency of esphome.
# ==============================================================================
if ! bashio::config.true 'use_new_device_builder'; then
exit 0
fi
bashio::log.info "Installing latest prerelease of esphome-device-builder..."
if command -v uv > /dev/null; then
uv pip install --system --no-cache-dir --prerelease=allow --upgrade \
esphome-device-builder ||
bashio::exit.nok "Failed installing esphome-device-builder."
else
pip install --no-cache-dir --pre --upgrade esphome-device-builder ||
bashio::exit.nok "Failed installing esphome-device-builder."
fi
bashio::log.info "Installed esphome-device-builder."

View File

@@ -49,5 +49,12 @@ if bashio::fs.directory_exists '/config/esphome/.esphome'; then
rm -rf /config/esphome/.esphome
fi
if bashio::config.true 'use_new_device_builder'; then
bashio::log.info "Starting ESPHome Device Builder..."
exec esphome-device-builder /config/esphome \
--ha-addon \
--ingress-port "$(bashio::addon.ingress_port)"
fi
bashio::log.info "Starting ESPHome dashboard..."
exec esphome dashboard /config/esphome --socket /var/run/esphome.sock --ha-addon

View File

@@ -4,6 +4,14 @@
# Community Hass.io Add-ons: ESPHome
# Configures NGINX for use with ESPHome
# ==============================================================================
# When the new device builder is enabled it serves HA ingress directly,
# so nginx is not used at all -- skip configuration.
if bashio::config.true 'use_new_device_builder'; then
bashio::log.info "Skipping NGINX setup: new device builder serves ingress directly."
bashio::exit.ok
fi
mkdir -p /var/log/nginx
# Generate Ingress configuration

View File

@@ -5,6 +5,14 @@
# Runs the NGINX proxy
# ==============================================================================
# The new device builder handles HA ingress itself, so nginx is bypassed.
# Block the longrun forever so s6 keeps the dependency satisfied and does
# not respawn it.
if bashio::config.true 'use_new_device_builder'; then
bashio::log.info "NGINX bypassed: new device builder serves ingress directly."
exec sleep infinity
fi
bashio::log.info "Waiting for ESPHome dashboard to come up..."
while [[ ! -S /var/run/esphome.sock ]]; do

View File

@@ -598,7 +598,7 @@ async def component_resume_action_to_code(
comp = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, comp)
if CONF_UPDATE_INTERVAL in config:
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int)
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, cg.uint32)
cg.add(var.set_update_interval(template_))
return var

View File

@@ -98,11 +98,13 @@ _KNOWN_FILE_EXTENSIONS = frozenset(
)
# Matches !secret references in YAML text. This is intentionally a simple
# regex scan rather than a YAML parse — it may match inside comments or
# multi-line strings, which is the conservative direction (include more
# secrets rather than fewer).
_SECRET_RE = re.compile(r"!secret\s+(\S+)")
# Matches !secret references in YAML text. An optional surrounding
# quote pair around the key is allowed and ignored: YAML treats
# ``!secret 'foo'`` and ``!secret foo`` as the same key. This is
# intentionally a simple regex scan rather than a YAML parse — it may
# match inside comments or multi-line strings, which is the conservative
# direction (include more secrets rather than fewer).
_SECRET_RE = re.compile(r"""!secret\s+['"]?([^\s'"]+)""")
def _find_used_secret_keys(yaml_files: list[Path]) -> set[str]:

View File

@@ -62,7 +62,12 @@ void Animation::set_frame(int frame) {
}
void Animation::update_data_start_() {
const uint32_t image_size = this->get_width_stride() * this->height_;
uint32_t image_size = this->get_width_stride() * this->height_;
// RGB565 with an alpha channel stores the alpha plane immediately after the RGB
// plane within each frame, so the per-frame stride includes the alpha bytes.
if (this->type_ == image::IMAGE_TYPE_RGB565 && this->transparency_ == image::TRANSPARENCY_ALPHA_CHANNEL) {
image_size += static_cast<uint32_t>(this->width_) * this->height_;
}
this->data_start_ = this->animation_data_start_ + image_size * this->current_frame_;
}

View File

@@ -72,17 +72,35 @@ APIUnregisterServiceCallAction = api_ns.class_(
UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger)
ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument")
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Owning element type for each YAML service variable type. Used to derive both
# the zero-copy native types and the owning fallback types below.
_SERVICE_ARG_SCALAR_TYPES: dict[str, MockObj] = {
"bool": cg.bool_,
"int": cg.int32,
"float": cg.float_,
"string": cg.std_string,
}
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Scalars are passed by value; string uses a non-owning view into rx_buf_.
**_SERVICE_ARG_SCALAR_TYPES,
"string": cg.StringRef,
"bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"),
"int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"),
"float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"),
"string[]": cg.FixedVector.template(cg.std_string)
.operator("const")
.operator("ref"),
# Arrays are passed as non-owning const references into rx_buf_.
**{
f"{name}[]": cg.FixedVector.template(t).operator("const").operator("ref")
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
# Owning fallback types used when the action chain contains non-synchronous actions
# (delay, wait_until, script.wait, etc.). The default non-owning types reference
# storage in the receive buffer, which is reused once the synchronous portion of
# the chain returns. FixedVector is also non-copyable, so the deferred lambda
# capture in DelayAction::play_complex would fail to compile.
SERVICE_ARG_FALLBACK_TYPES: dict[str, MockObj] = {
"string": cg.std_string,
**{
f"{name}[]": cg.std_vector.template(t)
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
CONF_ENCRYPTION = "encryption"
CONF_BATCH_DELAY = "batch_delay"
@@ -382,17 +400,20 @@ async def to_code(config: ConfigType) -> None:
func_args.append((cg.bool_, "return_response"))
# Check if action chain has non-synchronous actions that would make
# non-owning StringRef dangle (rx_buf_ reused after delay)
# non-owning args (StringRef, const FixedVector&) dangle once the
# rx_buf_ is reused after a delay/wait_until/script.wait/etc. The
# FixedVector references would also fail to compile because they
# are non-copyable and DelayAction captures args by value.
has_non_synchronous = automation.has_non_synchronous_actions(
conf.get(CONF_THEN, [])
)
service_arg_names: list[str] = []
for name, var_ in conf[CONF_VARIABLES].items():
native = SERVICE_ARG_NATIVE_TYPES[var_]
# Fall back to std::string for string args if non-synchronous actions exist
if has_non_synchronous and native is cg.StringRef:
native = cg.std_string
if has_non_synchronous and var_ in SERVICE_ARG_FALLBACK_TYPES:
native = SERVICE_ARG_FALLBACK_TYPES[var_]
else:
native = SERVICE_ARG_NATIVE_TYPES[var_]
service_template_args.append(native)
func_args.append((native, name))
service_arg_names.append(name)

View File

@@ -20,6 +20,7 @@ import contextlib
from esphome.const import CONF_KEY, CONF_PORT, __version__
from esphome.core import CORE
from esphome.platformio_api import process_stacktrace
from esphome.util import safe_print
from . import CONF_ENCRYPTION
@@ -60,7 +61,6 @@ async def async_run_logs(
noise_psk=noise_psk,
addresses=addresses, # Pass all addresses for automatic retry
)
dashboard = CORE.dashboard
backtrace_state = False
# Try platform-specific stacktrace handler first, fall back to generic
@@ -82,7 +82,11 @@ async def async_run_logs(
f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
)
for parsed_msg in parse_log_message(text, timestamp):
print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg)
# safe_print handles the dashboard \033 escaping and falls back
# to backslashreplace encoding on stdouts that can't represent
# the wifi signal-bar block characters (Windows redirected
# cp1252 pipe).
safe_print(parsed_msg)
for raw_line in text.splitlines():
if platform_process_stacktrace:
backtrace_state = platform_process_stacktrace(

View File

@@ -183,19 +183,19 @@ async def at581x_settings_to_code(config, action_id, template_arg, args):
cg.add(var.set_sensing_distance(template_))
if selfcheck := config.get(CONF_POWERON_SELFCHECK_TIME):
template_ = await cg.templatable(selfcheck, args, cg.int32)
template_ = await cg.templatable(selfcheck, args, cg.int_)
cg.add(var.set_poweron_selfcheck_time(template_))
if protect := config.get(CONF_PROTECT_TIME):
template_ = await cg.templatable(protect, args, cg.int32)
template_ = await cg.templatable(protect, args, cg.int_)
cg.add(var.set_protect_time(template_))
if trig_base := config.get(CONF_TRIGGER_BASE):
template_ = await cg.templatable(trig_base, args, cg.int32)
template_ = await cg.templatable(trig_base, args, cg.int_)
cg.add(var.set_trigger_base(template_))
if trig_keep := config.get(CONF_TRIGGER_KEEP):
template_ = await cg.templatable(trig_keep, args, cg.int32)
template_ = await cg.templatable(trig_keep, args, cg.int_)
cg.add(var.set_trigger_keep(template_))
if (stage_gain := config.get(CONF_STAGE_GAIN)) is not None:

View File

@@ -413,7 +413,7 @@ async def deep_sleep_enter_to_code(config, action_id, template_arg, args):
paren = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, paren)
if CONF_SLEEP_DURATION in config:
template_ = await cg.templatable(config[CONF_SLEEP_DURATION], args, cg.int32)
template_ = await cg.templatable(config[CONF_SLEEP_DURATION], args, cg.uint32)
cg.add(var.set_sleep_duration(template_))
if CONF_UNTIL in config:

View File

@@ -1740,7 +1740,17 @@ async def to_code(config):
# Wrap FILE*-based printf functions to eliminate newlib's _vfprintf_r
# (~11 KB). See printf_stubs.cpp for implementation.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF]:
#
# The wrap is only beneficial against newlib. Picolibc's tinystdio
# implements vsnprintf by building a string-output FILE and calling
# vfprintf, so vfprintf is unconditionally linked in by any caller
# of snprintf/vsnprintf — effectively every build — and the wrap
# saves nothing while costing ~170 B of shim. IDF 5.x defaults to
# newlib on every variant; IDF 6.0+ switches to picolibc on every
# variant.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF] or idf_version() >= cv.Version(
6, 0, 0
):
cg.add_define("USE_FULL_PRINTF")
else:
for symbol in ("vprintf", "printf", "fprintf", "vfprintf"):

View File

@@ -22,6 +22,12 @@ struct NVSData {
static std::vector<NVSData> s_pending_save; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
// open() runs from app_main() before the logger is initialized, so any failure
// must be deferred until after global_logger is set. This is emitted from the
// first make_preference() call, which runs from the generated setup() after
// log->pre_setup() has run at EARLY_INIT priority.
static esp_err_t s_open_err = ESP_OK; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
bool ESP32PreferenceBackend::save(const uint8_t *data, size_t len) {
// try find in pending saves and update that
for (auto &obj : s_pending_save) {
@@ -74,12 +80,14 @@ bool ESP32PreferenceBackend::load(uint8_t *data, size_t len) {
}
void ESP32Preferences::open() {
// Runs from app_main() before the logger is initialized; any logging here
// must be deferred. See s_open_err and make_preference() below.
nvs_flash_init();
esp_err_t err = nvs_open("esphome", NVS_READWRITE, &this->nvs_handle);
if (err == 0)
return;
ESP_LOGW(TAG, "nvs_open failed: %s - erasing NVS", esp_err_to_name(err));
s_open_err = err;
nvs_flash_deinit();
nvs_flash_erase();
nvs_flash_init();
@@ -91,6 +99,14 @@ void ESP32Preferences::open() {
}
ESPPreferenceObject ESP32Preferences::make_preference(size_t length, uint32_t type) {
if (s_open_err != ESP_OK) {
if (this->nvs_handle == 0) {
ESP_LOGW(TAG, "nvs_open failed: %s - NVS unavailable", esp_err_to_name(s_open_err));
} else {
ESP_LOGW(TAG, "nvs_open failed: %s - erased NVS", esp_err_to_name(s_open_err));
}
s_open_err = ESP_OK;
}
auto *pref = new ESP32PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
pref->nvs_handle = this->nvs_handle;
pref->key = type;

View File

@@ -1,32 +1,38 @@
/*
* Linker wrap stubs for FILE*-based printf functions.
* Linker wrap stubs for FILE*-based printf functions (newlib only).
*
* ESP-IDF SDK components (gpio driver, ringbuf, log_write) reference
* fprintf(), printf(), vprintf(), and vfprintf() which pull in the full
* printf implementation (~11 KB on newlib's _vfprintf_r, ~2.8 KB on
* picolibc's vfprintf). This is a separate implementation from the one
* used by snprintf/vsnprintf that handles FILE* stream I/O with buffering
* and locking.
* fprintf(), printf(), vprintf(), and vfprintf(), which on newlib pull
* in _vfprintf_r (~11 KB) — a separate implementation from the one used
* by snprintf/vsnprintf that handles FILE* stream I/O with buffering.
*
* ESPHome replaces the ESP-IDF log handler via esp_log_set_vprintf_(),
* so the SDK's vprintf() path is dead code at runtime. The fprintf()
* and printf() calls in SDK components are only in debug/assert paths
* (gpio_dump_io_configuration, ringbuf diagnostics) that are either
* GC'd or never called. Crash backtraces and panic output are
* unaffected they use esp_rom_printf() which is a ROM function
* and does not go through libc.
* unaffected; they use esp_rom_printf() which is a ROM function and
* does not go through libc.
*
* These stubs redirect through vsnprintf() (which uses _svfprintf_r
* already in the binary) and fwrite(), allowing the linker to
* dead-code eliminate _vfprintf_r.
* This wrap is newlib-only. On picolibc, vsnprintf is implemented as
* vfprintf into a string-output FILE, so vfprintf is unconditionally
* linked in by any caller of snprintf/vsnprintf and the wrap can never
* elide it — it just adds shim cost. Codegen forces USE_FULL_PRINTF
* on picolibc builds (IDF 6.0+ on all variants) so this file compiles
* to nothing there; the #error below catches a desynchronised gate.
*
* Saves ~11 KB of flash.
* Saves ~11 KB of flash on newlib.
*
* To disable these wraps, set enable_full_printf: true in the esp32
* advanced config section.
* To disable this wrap on newlib, set enable_full_printf: true in the
* esp32 advanced config section.
*/
#if defined(USE_ESP_IDF) && !defined(USE_FULL_PRINTF)
#ifdef __PICOLIBC__
#error "printf wrap is net-negative on picolibc; codegen should set USE_FULL_PRINTF"
#endif
#include <cstdarg>
#include <cstdio>
@@ -34,6 +40,9 @@
namespace esphome::esp32 {}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
static constexpr size_t PRINTF_BUFFER_SIZE = 512;
// These stubs are essentially dead code at runtime — ESPHome replaces the
@@ -55,14 +64,16 @@ static int write_printf_buffer(FILE *stream, char *buf, int len) {
return len;
}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
int __wrap_vprintf(const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stdout, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_printf(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
@@ -71,11 +82,6 @@ int __wrap_printf(const char *fmt, ...) {
return len;
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_fprintf(FILE *stream, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);

View File

@@ -216,6 +216,7 @@ void ESP32TouchComponent::setup() {
// Do initial oneshot scans to populate baseline values
for (uint32_t i = 0; i < ONESHOT_SCAN_COUNT; i++) {
err = touch_sensor_trigger_oneshot_scanning(this->sens_handle_, ONESHOT_SCAN_TIMEOUT_MS);
App.feed_wdt(); // 3 scans with 2s timeout might exceed WDT, so feed it here to be safe
if (err != ESP_OK) {
ESP_LOGW(TAG, "Oneshot scan %" PRIu32 " failed: %s", i, esp_err_to_name(err));
}

View File

@@ -744,21 +744,28 @@ async def write_image(config, all_frames=False):
if frame_count <= 1:
_LOGGER.warning("Image file %s has no animation frames", path)
total_rows = height * frame_count
encoder = IMAGE_TYPE[type](width, total_rows, transparency, dither, invert_alpha)
if byte_order := config.get(CONF_BYTE_ORDER):
# Check for valid type has already been done in validate_settings
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
# Encode each frame with its own encoder and concatenate. This keeps every
# frame self-contained on disk (e.g. RGB565+alpha emits [RGB plane | alpha plane]
# per frame) so animation frame stepping in image.cpp / animation.cpp stays
# correct without needing to know the total frame count.
byte_order = config.get(CONF_BYTE_ORDER)
combined_data: list[int] = []
encoder: ImageEncoder | None = None
for frame_index in range(frame_count):
image.seek(frame_index)
encoder = IMAGE_TYPE[type](width, height, transparency, dither, invert_alpha)
if byte_order is not None:
# Check for valid type has already been done in validate_settings
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
pixels = encoder.convert(image.resize((width, height)), path).getdata()
for row in range(height):
for col in range(width):
encoder.encode(pixels[row * width + col])
encoder.end_row()
encoder.end_image()
encoder.end_image()
combined_data.extend(encoder.data)
rhs = [HexInt(x) for x in encoder.data]
rhs = [HexInt(x) for x in combined_data]
prog_arr = cg.progmem_array(config[CONF_RAW_DATA_ID], rhs)
image_type = get_image_type_enum(type)
trans_value = get_transparency_enum(encoder.transparency)

View File

@@ -454,10 +454,12 @@ void LVTouchListener::update(const touchscreen::TouchPoints_t &tpoints) {
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value) {
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value) {
auto *scale = lv_obj_get_parent(obj);
auto min_value = lv_scale_get_range_min_value(scale);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (lv_scale_get_range_max_value(scale) - min_value) +
auto max_value = lv_scale_get_range_max_value(scale);
value = clamp(value, min_value, max_value);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (max_value - min_value) +
lv_scale_get_rotation((scale))) %
360;
}

View File

@@ -112,7 +112,7 @@ inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images
#endif // USE_LVGL_ANIMIMG
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value);
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value);
#endif
// Parent class for things that wrap an LVGL object

View File

@@ -22,7 +22,7 @@ from ..defines import (
literal,
)
from ..lv_validation import animated, lv_int, size
from ..lvcode import LocalVariable, lv, lv_assign, lv_expr, lv_obj
from ..lvcode import LocalVariable, lv, lv_assign, lv_expr, lv_obj, lv_Pvariable
from ..schemas import container_schema, part_schema
from ..types import LV_EVENT, LvType, ObjUpdateAction, lv_obj_t, lv_obj_t_ptr
from . import Widget, WidgetType, add_widgets, get_widgets, set_obj_properties
@@ -83,8 +83,8 @@ class TabviewType(WidgetType):
await w.set_property("tab_bar_size", await size.process(config[CONF_SIZE]))
for tab_conf in config[CONF_TABS]:
w_id = tab_conf[CONF_ID]
tab_obj = cg.Pvariable(w_id, cg.nullptr, type_=lv_tab_t)
tab_widget = Widget.create(w_id, tab_obj, obj_spec)
tab_obj = lv_Pvariable(lv_tab_t, w_id)
tab_widget = Widget.create(w_id, tab_obj, obj_spec, tab_conf)
lv_assign(tab_obj, lv_expr.tabview_add_tab(w.obj, tab_conf[CONF_NAME]))
await set_obj_properties(tab_widget, tab_conf)
await add_widgets(tab_widget, tab_conf)

View File

@@ -2,6 +2,7 @@ from esphome import pins
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_ALLOW_OTHER_USES,
CONF_ID,
CONF_INPUT,
CONF_INTERRUPT,
@@ -30,10 +31,29 @@ MCP23XXX_INTERRUPT_MODES = {
"FALLING": MCP23XXXInterruptMode.MCP23XXX_FALLING,
}
def _validate_interrupt_pin(value):
# The MCP component owns INT polarity (active-low, hardcoded falling-edge ISR)
# and installs a single ISR per GPIO, so neither inversion nor sharing is supported.
value = pins.internal_gpio_input_pin_schema(value)
if value.get(CONF_INVERTED):
raise cv.Invalid(
f"'{CONF_INVERTED}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"the MCP23xxx INT line is fixed active-low"
)
if value.get(CONF_ALLOW_OTHER_USES):
raise cv.Invalid(
f"'{CONF_ALLOW_OTHER_USES}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"sharing the interrupt pin between multiple MCP23xxx (or other components) "
"is not implemented. Remove the interrupt_pin to fall back to polling."
)
return value
MCP23XXX_CONFIG_SCHEMA = cv.Schema(
{
cv.Optional(CONF_OPEN_DRAIN_INTERRUPT, default=False): cv.boolean,
cv.Optional(CONF_INTERRUPT_PIN): pins.internal_gpio_input_pin_schema,
cv.Optional(CONF_INTERRUPT_PIN): _validate_interrupt_pin,
}
).extend(cv.COMPONENT_SCHEMA)

View File

@@ -641,6 +641,7 @@ void Nextion::process_nextion_commands_() {
} else {
ESP_LOGN(TAG, "String resp: '%s' id: %s type: %s", to_process.c_str(), component->get_variable_name().c_str(),
component->get_queue_type_string());
component->set_state_from_string(to_process, true, false);
}
delete nb; // NOLINT(cppcoreguidelines-owning-memory)

View File

@@ -16,6 +16,13 @@ namespace esphome::nextion {
static const char *const TAG = "nextion.upload.arduino";
static constexpr size_t NEXTION_MAX_RESPONSE_LOG_BYTES = 16;
// Timeout for display acknowledgment during TFT upload (ms).
// A single value is used for all chunks; the happy path returns as soon as
// 0x05/0x08 arrives, so this only bounds failed-detection latency. Field
// reports showed the previous 500ms steady-state value was too tight for
// some firmware variants.
static constexpr uint32_t NEXTION_UPLOAD_ACK_TIMEOUT_MS = 5000;
// Followed guide
// https://unofficialnextion.com/t/nextion-upload-protocol-v1-2-the-fast-one/1044/2
@@ -80,14 +87,14 @@ int Nextion::upload_by_chunks_(HTTPClient &http_client, uint32_t &range_start) {
recv_string.clear();
this->write_array(buffer, buffer_size);
App.feed_wdt();
this->recv_ret_string_(recv_string, this->upload_first_chunk_sent_ ? 500 : 5000, true);
this->recv_ret_string_(recv_string, NEXTION_UPLOAD_ACK_TIMEOUT_MS, true);
this->content_length_ -= read_len;
const float upload_percentage = 100.0f * (this->tft_size_ - this->content_length_) / this->tft_size_;
ESP_LOGD(TAG, "Upload: %0.2f%% (%" PRIu32 " left, heap: %" PRIu32 ")", upload_percentage, this->content_length_,
EspClass::getFreeHeap());
this->upload_first_chunk_sent_ = true;
if (recv_string.empty()) {
ESP_LOGW(TAG, "No response from display during upload");
ESP_LOGW(TAG, "No response from display after %" PRIu32 "ms", NEXTION_UPLOAD_ACK_TIMEOUT_MS);
allocator.deallocate(buffer, 4096);
buffer = nullptr;
return -1;

View File

@@ -19,6 +19,13 @@ namespace esphome::nextion {
static const char *const TAG = "nextion.upload.esp32";
static constexpr size_t NEXTION_MAX_RESPONSE_LOG_BYTES = 16;
// Timeout for display acknowledgment during TFT upload (ms).
// A single value is used for all chunks; the happy path returns as soon as
// 0x05/0x08 arrives, so this only bounds failed-detection latency. Field
// reports showed the previous 500ms steady-state value was too tight for
// some firmware variants.
static constexpr uint32_t NEXTION_UPLOAD_ACK_TIMEOUT_MS = 5000;
// Followed guide
// https://unofficialnextion.com/t/nextion-upload-protocol-v1-2-the-fast-one/1044/2
@@ -96,7 +103,7 @@ int Nextion::upload_by_chunks_(esp_http_client_handle_t http_client, uint32_t &r
recv_string.clear();
this->write_array(buffer, buffer_size);
App.feed_wdt();
this->recv_ret_string_(recv_string, upload_first_chunk_sent_ ? 500 : 5000, true);
this->recv_ret_string_(recv_string, NEXTION_UPLOAD_ACK_TIMEOUT_MS, true);
this->content_length_ -= read_len;
const float upload_percentage = 100.0f * (this->tft_size_ - this->content_length_) / this->tft_size_;
#ifdef USE_PSRAM
@@ -109,7 +116,7 @@ int Nextion::upload_by_chunks_(esp_http_client_handle_t http_client, uint32_t &r
#endif
upload_first_chunk_sent_ = true;
if (recv_string.empty()) {
ESP_LOGW(TAG, "No response from display during upload");
ESP_LOGW(TAG, "No response from display after %" PRIu32 "ms", NEXTION_UPLOAD_ACK_TIMEOUT_MS);
allocator.deallocate(buffer, 4096);
buffer = nullptr;
return -1;

View File

@@ -129,6 +129,6 @@ async def to_code(config):
async def sensor_template_publish_to_code(config, action_id, template_arg, args):
paren = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, paren)
template_ = await cg.templatable(config[CONF_VALUE], args, cg.int32)
template_ = await cg.templatable(config[CONF_VALUE], args, cg.int_)
cg.add(var.set_value(template_))
return var

View File

@@ -297,7 +297,18 @@ def _push_context(
"""Resolve a variable, recursively resolving any dependencies it references."""
value = unresolved_vars.pop(key, Missing)
if value is Missing:
return Missing
# Either already resolved (in resolved_vars) or currently being
# resolved (self-reference from inside a dict-valued substitution).
# Returning what we have lets sibling references inside a dict
# value, e.g. ``${device.manufacturer}`` inside ``device.name``,
# see literal sibling values during their own resolution.
return resolved_vars.get(key, Missing)
if isinstance(value, dict):
# Dict-valued substitutions form a namespace; eagerly publish the
# original mapping so its members can reference each other while
# the dict's own substitution pass is still running. The entry is
# replaced with the fully-substituted dict once recursion returns.
resolved_vars[key] = value
try:
value = substitute(value, [], resolver_context, True)
except UndefinedError as err:

View File

@@ -1,3 +1,4 @@
import errno
from importlib import resources
import logging
@@ -74,6 +75,12 @@ def _load_tzdata(iana_key: str) -> bytes | None:
return (resources.files(package) / resource).read_bytes()
except (FileNotFoundError, ModuleNotFoundError, IsADirectoryError):
return None
except OSError as e:
# Windows raises EINVAL for paths with NTFS-illegal chars (e.g. '<'/'>'
# in POSIX TZ strings like "<+08>-8" that validate_tz feeds back here).
if e.errno == errno.EINVAL:
return None
raise
def _extract_tz_string(tzfile: bytes) -> str:

View File

@@ -1570,6 +1570,8 @@ void WiFiComponent::check_connecting_finished(uint32_t now) {
#endif
this->state_ = WIFI_COMPONENT_STATE_STA_CONNECTED;
// Refresh is_connected() cache; loop()'s refresh ran before this transition.
this->update_connected_state_();
this->num_retried_ = 0;
this->print_connect_params_();

View File

@@ -948,6 +948,8 @@ void WiFiComponent::process_pending_callbacks_() {
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
if (this->pending_.disconnect) {
this->pending_.disconnect = false;
// Refresh is_connected() cache here, not in the SDK callback (sys context).
this->update_connected_state_();
this->notify_disconnect_state_listeners_();
}
#endif

View File

@@ -179,7 +179,10 @@ void WiFiComponent::wifi_pre_setup_() {
#endif // USE_WIFI_AP
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
// cfg.nvs_enable = false;
if (global_preferences->nvs_handle == 0) {
ESP_LOGW(TAG, "starting wifi without nvs");
cfg.nvs_enable = false;
}
err = esp_wifi_init(&cfg);
if (err != ERR_OK) {
ESP_LOGE(TAG, "esp_wifi_init failed: %s", esp_err_to_name(err));
@@ -796,6 +799,8 @@ void WiFiComponent::wifi_process_event_(IDFWiFiEvent *data) {
s_sta_connected = false;
s_sta_connecting = false;
error_from_callback_ = true;
// Refresh is_connected() cache; error_from_callback_ makes it false.
this->update_connected_state_();
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
this->notify_disconnect_state_listeners_();
#endif

View File

@@ -536,6 +536,8 @@ void WiFiComponent::wifi_process_event_(LTWiFiEvent *event) {
this->error_from_callback_ = true;
}
// Refresh is_connected() cache; sta_state_/error_from_callback_ make it false.
this->update_connected_state_();
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
this->notify_disconnect_state_listeners_();
#endif

View File

@@ -342,6 +342,8 @@ void WiFiComponent::wifi_loop_() {
s_sta_was_connected = false;
s_sta_had_ip = false;
ESP_LOGV(TAG, "Disconnected");
// Refresh is_connected() cache; driver link status reports disconnected.
this->update_connected_state_();
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
this->notify_disconnect_state_listeners_();
#endif

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2026.4.2"
__version__ = "2026.4.5"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -562,14 +562,9 @@ async def _add_controller_registry_define() -> None:
@coroutine_with_priority(CoroPriority.FINAL)
async def _add_looping_components() -> None:
# Emit a constexpr that computes the looping component count at C++ compile time
# and pre-init the FixedVector with the exact capacity. Uses std::is_same_v to
# detect loop() overrides. The constexpr goes in main.cpp's global section where
# all component types are in scope. calculate_looping_components_() then skips
# the counting pass and only does the two population passes.
# Emit ESPHOME_LOOPING_COMPONENT_COUNT. Sizing of looping_components_
# happens in core to_code() so it lands before safe_mode's early return.
entries = CORE.data.get("looping_component_entries", [])
if not entries:
return
# Build constexpr sum for the exact count, deduplicating by type
# Uses HasLoopOverride<T> which handles ambiguous &T::loop from multiple inheritance
@@ -577,7 +572,7 @@ async def _add_looping_components() -> None:
terms = [
f"({count} * HasLoopOverride<{cpp_type}>::value)"
for cpp_type, count in type_counts.items()
]
] or ["0"]
constexpr_expr = " + \\\n ".join(terms)
cg.add_global(
cg.RawStatement(
@@ -586,14 +581,6 @@ async def _add_looping_components() -> None:
)
)
# Pre-init FixedVector with exact capacity so calculate_looping_components_()
# can skip the counting pass
cg.add(
cg.RawExpression(
"App.looping_components_.init(ESPHOME_LOOPING_COMPONENT_COUNT)"
)
)
@coroutine_with_priority(CoroPriority.CORE)
async def to_code(config: ConfigType) -> None:
@@ -636,6 +623,14 @@ async def to_code(config: ConfigType) -> None:
# Define component count for static allocation
cg.add_define("ESPHOME_COMPONENT_COUNT", len(CORE.component_ids))
# Pre-init FixedVector with exact capacity so calculate_looping_components_()
# can skip the counting pass
cg.add(
cg.RawExpression(
"App.looping_components_.init(ESPHOME_LOOPING_COMPONENT_COUNT)"
)
)
CORE.add_job(_add_platform_defines)
CORE.add_job(_add_controller_registry_define)
CORE.add_job(_add_looping_components)

View File

@@ -53,6 +53,37 @@ FILTER_PLATFORMIO_LINES = [
]
def _strip_win_long_path_prefix(path: str) -> str:
r"""Strip the Windows extended-length path prefix from ``path``.
Handles both forms documented at
https://learn.microsoft.com/windows/win32/fileio/naming-a-file:
* ``\\?\C:\path\to\file`` -> ``C:\path\to\file``
* ``\\?\UNC\server\share\path`` -> ``\\server\share\path``
The NSIS-installed ``esphome.exe`` launcher on Windows starts Python with
``sys.executable`` already prefixed with ``\\?\``. That prefix propagates
into PlatformIO's ``$PYTHONEXE`` (PlatformIO reads ``PYTHONEXEPATH`` from
the environment, falling back to ``os.path.normpath(sys.executable)``)
and ends up baked into SCons-emitted command lines for build steps such
as the esp8266 ``elf2bin`` invocation. ``cmd.exe`` does not understand
the ``\\?\`` prefix, so the build fails with
"The system cannot find the path specified." Stripping the prefix early
keeps the path shell-quotable.
No-op on non-Windows platforms.
"""
if sys.platform != "win32":
return path
if path.startswith("\\\\?\\UNC\\"):
# \\?\UNC\server\share\... -> \\server\share\...
return "\\\\" + path[len("\\\\?\\UNC\\") :]
if path.startswith("\\\\?\\"):
return path[len("\\\\?\\") :]
return path
def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ["PLATFORMIO_FORCE_COLOR"] = "true"
os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute())
@@ -63,7 +94,18 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning")
# Increase uv retry count to handle transient network errors (default is 3)
os.environ.setdefault("UV_HTTP_RETRIES", "10")
cmd = [sys.executable, "-m", "esphome.platformio_runner"] + list(args)
# Strip the Windows extended-length path prefix from sys.executable so it
# doesn't propagate into PlatformIO's $PYTHONEXE and break SCons-emitted
# command lines run through cmd.exe.
python_exe = _strip_win_long_path_prefix(sys.executable)
if python_exe != sys.executable:
# Only override PYTHONEXEPATH when we actually stripped a prefix.
# PlatformIO's get_pythonexe_path() reads this and falls back to
# sys.executable otherwise; setting it unconditionally would clobber
# a user-provided value (or the unmodified path on platforms that
# don't need the strip).
os.environ["PYTHONEXEPATH"] = python_exe
cmd = [python_exe, "-m", "esphome.platformio_runner"] + list(args)
return run_external_process(*cmd, **kwargs)

View File

@@ -94,13 +94,29 @@ def safe_print(message="", end="\n"):
except UnicodeEncodeError:
pass
# Fall back to the stream's actual encoding (e.g. cp1252 on Windows
# redirected pipes). Use "backslashreplace" so unencodable code points
# like the wifi signal-bar block characters (U+2582..U+2588) become
# readable ``\uXXXX`` escapes, and decode back to ``str`` so ``print``
# never receives a ``bytes`` object (which would render as a ``b'...'``
# repr).
encoding = sys.stdout.encoding or "ascii"
try:
print(message.encode("utf-8", "backslashreplace"), end=end)
print(
message.encode(encoding, "backslashreplace").decode(encoding),
end=end,
)
return
except UnicodeEncodeError:
try:
print(message.encode("ascii", "backslashreplace"), end=end)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
pass
try:
print(
message.encode("ascii", "backslashreplace").decode("ascii"),
end=end,
)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
def safe_input(prompt=""):

View File

@@ -7,10 +7,12 @@ from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
from PIL import Image as PILImage
import pytest
from esphome import config_validation as cv
from esphome.components.image import (
CONF_ALPHA_CHANNEL,
CONF_INVERT_ALPHA,
CONF_OPAQUE,
CONF_TRANSPARENCY,
@@ -411,3 +413,70 @@ async def test_svg_with_mm_dimensions_succeeds(
assert 30 < height < 50, (
f"Height should be around 39 pixels for 10mm at 100dpi, got {height}"
)
@pytest.mark.asyncio
async def test_rgb565_alpha_animation_layout_per_frame(
tmp_path: Path,
mock_progmem_array: MagicMock,
) -> None:
"""RGB565+alpha animations must store each frame as a self-contained
[RGB plane | alpha plane] block. Animation::update_data_start_ steps frames
with a single per-frame stride, so any cross-frame layout (all RGB then all
alpha) makes the C++ alpha read land in the next frame's RGB bytes — that
was the regression behind issue #15999.
"""
# Build a 2-frame APNG where each frame is a solid color with a known
# alpha. APNG preserves full RGBA per pixel (GIF only has 1-bit alpha so
# round-tripping mid-range alpha values does not work). Frame 0 is fully
# opaque red, frame 1 is fully transparent blue.
width = 4
height = 3
frame0 = PILImage.new("RGBA", (width, height), (255, 0, 0, 0xFF))
frame1 = PILImage.new("RGBA", (width, height), (0, 0, 255, 0x00))
apng_path = tmp_path / "anim.png"
frame0.save(
apng_path,
format="PNG",
save_all=True,
append_images=[frame1],
duration=100,
loop=0,
)
config = {
CONF_FILE: str(apng_path),
CONF_TYPE: "RGB565",
CONF_TRANSPARENCY: CONF_ALPHA_CHANNEL,
CONF_DITHER: "NONE",
CONF_INVERT_ALPHA: False,
CONF_RAW_DATA_ID: "test_raw_data_id",
}
_, _, _, _, _, frame_count = await write_image(config, all_frames=True)
assert frame_count == 2
# Recover the bytes handed to progmem_array. Signature is (id_, rhs).
_, raw_data = mock_progmem_array.call_args.args
data = [int(x) for x in raw_data]
rgb_size = width * height * 2
alpha_size = width * height
frame_size = rgb_size + alpha_size
assert len(data) == frame_size * frame_count, (
"RGB565+alpha animation buffer must be (RGB + alpha) per frame, not "
"all RGB followed by all alpha"
)
# Frame 0: RGB plane is red, alpha plane is 0xFF. Frame 1: alpha plane is
# 0x00. If the layout regresses to [all RGB | all alpha], the alpha bytes
# would all land at the tail of the buffer and the per-frame slices below
# would point at RGB565 noise instead.
frame0_alpha = data[rgb_size : rgb_size + alpha_size]
frame1_alpha = data[frame_size + rgb_size : frame_size + rgb_size + alpha_size]
assert all(a == 0xFF for a in frame0_alpha), (
f"Frame 0 alpha plane should be opaque, got {frame0_alpha}"
)
assert all(a == 0x00 for a in frame1_alpha), (
f"Frame 1 alpha plane should be transparent, got {frame1_alpha}"
)

View File

@@ -91,6 +91,24 @@ api:
- float_arr.size()
- string_arr[0].c_str()
- string_arr.size()
# Test array + string args used after a non-synchronous action (delay).
# The default non-owning types (StringRef, const FixedVector&) would
# dangle once rx_buf_ is reused, and FixedVector is non-copyable so
# DelayAction's lambda capture would fail to compile. The api codegen
# must fall back to owning std::string / std::vector here.
- action: array_with_delay
variables:
name: string
int_arr: int[]
string_arr: string[]
then:
- delay: 20ms
- logger.log:
format: "Delayed: %s (%u ints, %u strings)"
args:
- name.c_str()
- int_arr.size()
- string_arr.size()
# Test ContinuationAction (IfAction with then/else branches)
- action: test_if_action
variables:

View File

@@ -4,3 +4,9 @@ esphome:
- deep_sleep.prevent
- delay: 1s
- deep_sleep.allow
- if:
condition:
lambda: 'return false;'
then:
- deep_sleep.enter:
sleep_duration: 60min

View File

@@ -501,14 +501,15 @@ async def _read_stream_lines(
@asynccontextmanager
async def run_binary_and_wait_for_port(
async def run_binary(
binary_path: Path,
host: str,
port: int,
timeout: float = PORT_WAIT_TIMEOUT,
line_callback: Callable[[str], None] | None = None,
) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit."""
) -> AsyncGenerator[tuple[asyncio.subprocess.Process, list[str]]]:
"""Run a binary under a PTY, capture log output, and clean up on exit.
Yields the running ``Process`` and a live list of captured log lines.
No port wait -- callers that need that should use
``run_binary_and_wait_for_port``."""
# Create a pseudo-terminal to make the binary think it's running interactively
# This is needed because the ESPHome host logger checks isatty()
controller_fd, device_fd = pty.openpty()
@@ -535,7 +536,6 @@ async def run_binary_and_wait_for_port(
controller_transport, _ = await loop.connect_read_pipe(
lambda: controller_protocol, os.fdopen(controller_fd, "rb", 0)
)
output_reader = controller_reader
if process.returncode is not None:
raise RuntimeError(
@@ -543,27 +543,59 @@ async def run_binary_and_wait_for_port(
"Ensure the binary is valid and can run successfully."
)
# Wait for the API server to start listening
loop = asyncio.get_running_loop()
start_time = loop.time()
# Start collecting output
stdout_lines: list[str] = []
output_tasks: list[asyncio.Task] = []
output_task = asyncio.create_task(
_read_stream_lines(controller_reader, stdout_lines, sys.stdout, line_callback)
)
try:
# Read from output stream
output_tasks = [
asyncio.create_task(
_read_stream_lines(
output_reader, stdout_lines, sys.stdout, line_callback
)
)
]
# Small yield to ensure the process has a chance to start
await asyncio.sleep(0)
yield process, stdout_lines
finally:
output_task.cancel()
result = await asyncio.gather(output_task, return_exceptions=True)
if isinstance(result[0], Exception) and not isinstance(
result[0], asyncio.CancelledError
):
print(f"Error reading from PTY: {result[0]}", file=sys.stderr)
# Close the PTY transport (Unix only)
if controller_transport is not None:
controller_transport.close()
# Cleanup: terminate the process gracefully
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def run_binary_and_wait_for_port(
binary_path: Path,
host: str,
port: int,
timeout: float = PORT_WAIT_TIMEOUT,
line_callback: Callable[[str], None] | None = None,
) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit."""
async with run_binary(binary_path, line_callback=line_callback) as (
process,
stdout_lines,
):
loop = asyncio.get_running_loop()
start_time = loop.time()
while loop.time() - start_time < timeout:
try:
# Try to connect to the port
@@ -593,41 +625,6 @@ async def run_binary_and_wait_for_port(
raise TimeoutError(error_msg)
finally:
# Cancel output collection tasks
for task in output_tasks:
task.cancel()
# Wait for tasks to complete and check for exceptions
results = await asyncio.gather(*output_tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(
result, asyncio.CancelledError
):
print(
f"Error reading from PTY: {result}",
file=sys.stderr,
)
# Close the PTY transport (Unix only)
if controller_transport is not None:
controller_transport.close()
# Cleanup: terminate the process gracefully
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def run_compiled_context(

View File

@@ -0,0 +1,25 @@
esphome:
name: safe-mode-loop-runs
host:
logger:
safe_mode:
num_attempts: 10
on_safe_mode:
- lambda: |-
// Spawn a detached thread that logs a unique marker. The
// non-main-thread log goes through the task log buffer, which
// is only drained by Logger::loop(). If looping components
// weren't initialized (the bug fixed in #16269), the buffer is
// never read and the marker never reaches the console.
struct MarkerThread {
static void *thread_func(void *) {
ESP_LOGI("safe_mode_test", "looping component ran in safe mode");
return nullptr;
}
};
pthread_t t;
pthread_create(&t, nullptr, MarkerThread::thread_func, nullptr);
pthread_detach(t);

View File

@@ -0,0 +1,39 @@
"""Helpers for manipulating the host platform's preferences file.
ESPHome's host platform stores preferences in
``~/.esphome/prefs/<app_name>.prefs`` using a simple binary layout that
mirrors ``HostPreferences::sync()``:
``[uint32_t key][uint8_t len][uint8_t data[len]]`` per entry.
Tests use these helpers to pre-populate state the binary will see at
boot (e.g. forcing safe mode) or to clear stale state between runs.
"""
from __future__ import annotations
from pathlib import Path
import struct
def host_prefs_path(device_name: str) -> Path:
"""Return the on-disk prefs file path for a host-platform device."""
return Path.home() / ".esphome" / "prefs" / f"{device_name}.prefs"
def clear_host_prefs(device_name: str) -> None:
"""Delete the prefs file for a host-platform device, if it exists."""
host_prefs_path(device_name).unlink(missing_ok=True)
def write_host_pref(device_name: str, key: int, data: bytes) -> Path:
"""Write a single preference entry, replacing the file's contents.
Returns the path that was written.
"""
if len(data) > 255:
raise ValueError(f"Preference data too long: {len(data)} bytes (max 255)")
path = host_prefs_path(device_name)
path.parent.mkdir(parents=True, exist_ok=True)
payload = struct.pack("<IB", key, len(data)) + data
path.write_bytes(payload)
return path

View File

@@ -0,0 +1,94 @@
"""Regression test for safe_mode + looping_components init ordering.
Reproduces the bug fixed in https://github.com/esphome/esphome/pull/16269:
``App.looping_components_.init(...)`` was emitted at ``CoroPriority.FINAL``,
which placed it *after* the ``safe_mode`` early-return in ``setup_app()``.
When safe mode was entered, the ``FixedVector`` backing the looping-component
list was never sized, ``looping_components_active_end_`` stayed at 0, and
``loop()`` iterated zero components -- so any looping component above
``CoroPriority.APPLICATION`` (e.g. wifi, logger) never ran.
The test forces safe mode by writing ``ENTER_SAFE_MODE_MAGIC`` to the host
preferences file before booting, then asserts that ``Logger::loop()`` runs
by logging from a non-main thread. Non-main-thread logs are buffered in
``TaskLogBuffer`` and only emitted to the console when ``Logger::loop()``
drains the buffer. Without the fix, the marker stays in the buffer
forever; with the fix, it reaches the console.
The API server (``CoroPriority.WEB``, 40) is registered below safe_mode
(``CoroPriority.APPLICATION``, 50), so it's never set up when safe mode
is active and ``run_compiled`` would hang waiting for the API port.
This test uses ``run_binary`` directly to skip the port wait.
"""
from __future__ import annotations
import asyncio
import re
import struct
import pytest
from .conftest import run_binary
from .host_prefs import clear_host_prefs, write_host_pref
from .types import CompileFunction, ConfigWriter
# Must match esphome::safe_mode::RTC_KEY in safe_mode.h
SAFE_MODE_RTC_KEY = 233825507
# Must match esphome::safe_mode::SafeModeComponent::ENTER_SAFE_MODE_MAGIC
ENTER_SAFE_MODE_MAGIC = 0x5AFE5AFE
DEVICE_NAME = "safe-mode-loop-runs"
THREAD_LOG_MARKER = "looping component ran in safe mode"
@pytest.mark.asyncio
async def test_safe_mode_loop_runs(
yaml_config: str,
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
) -> None:
"""When safe mode is active, ``App.loop()`` must still iterate looping
components -- proven here by a thread-logged marker reaching the
console (which requires ``Logger::loop()`` to run)."""
config_path = await write_yaml_config(yaml_config)
binary_path = await compile_esphome(config_path)
# Compile finished successfully; pre-populate prefs so the *next* run
# enters safe mode immediately.
write_host_pref(
DEVICE_NAME, SAFE_MODE_RTC_KEY, struct.pack("<I", ENTER_SAFE_MODE_MAGIC)
)
try:
loop = asyncio.get_running_loop()
safe_mode_active = loop.create_future()
thread_log_seen = loop.create_future()
safe_mode_pattern = re.compile(r"SAFE MODE IS ACTIVE")
thread_log_pattern = re.compile(re.escape(THREAD_LOG_MARKER))
def on_log(line: str) -> None:
if not safe_mode_active.done() and safe_mode_pattern.search(line):
safe_mode_active.set_result(True)
if not thread_log_seen.done() and thread_log_pattern.search(line):
thread_log_seen.set_result(True)
async with run_binary(binary_path, line_callback=on_log):
try:
await asyncio.wait_for(safe_mode_active, timeout=15.0)
except TimeoutError:
pytest.fail(
"Did not observe 'SAFE MODE IS ACTIVE' -- safe mode "
"didn't trigger, so this test isn't exercising the bug."
)
try:
await asyncio.wait_for(thread_log_seen, timeout=10.0)
except TimeoutError:
pytest.fail(
f"Did not observe thread-logged marker {THREAD_LOG_MARKER!r} "
"within timeout. Logger::loop() never drained the task "
"log buffer, meaning App.looping_components_ was never "
"sized -- this is the regression #16269 fixed."
)
finally:
clear_host_prefs(DEVICE_NAME)

View File

@@ -9,7 +9,6 @@ Tests that:
from __future__ import annotations
import asyncio
from pathlib import Path
import socket
from typing import Any
@@ -17,9 +16,12 @@ from aioesphomeapi import TextInfo, TextState
import pytest
from .conftest import run_binary_and_wait_for_port, wait_and_connect_api_client
from .host_prefs import clear_host_prefs
from .state_utils import InitialStateHelper, require_entity
from .types import CompileFunction, ConfigWriter
DEVICE_NAME = "host-template-text-save-test"
@pytest.mark.asyncio
async def test_template_text_save(
@@ -32,11 +34,7 @@ async def test_template_text_save(
port, port_socket = reserved_tcp_port
# Clean up any stale preference file from previous runs
prefs_file = (
Path.home() / ".esphome" / "prefs" / "host-template-text-save-test.prefs"
)
if prefs_file.exists():
prefs_file.unlink()
clear_host_prefs(DEVICE_NAME)
# Write and compile once
config_path = await write_yaml_config(yaml_config)
@@ -59,7 +57,7 @@ async def test_template_text_save(
wait_and_connect_api_client(port=port) as client,
):
device_info = await client.device_info()
assert device_info.name == "host-template-text-save-test"
assert device_info.name == DEVICE_NAME
entities, _ = await client.list_entities_services()
text_entity = require_entity(
@@ -127,5 +125,4 @@ async def test_template_text_save(
)
# Clean up preference file
if prefs_file.exists():
prefs_file.unlink()
clear_host_prefs(DEVICE_NAME)

View File

@@ -1,6 +1,11 @@
"""Tests for time component cron expression parsing."""
from esphome.components.time import _parse_cron_part
import errno
from unittest.mock import MagicMock, patch
import pytest
from esphome.components.time import _load_tzdata, _parse_cron_part, validate_tz
def test_star_slash_seconds() -> None:
@@ -78,3 +83,63 @@ def test_range() -> None:
def test_single_value() -> None:
assert _parse_cron_part("30", 0, 59, {}) == {30}
def _mock_resources_with_error(error: Exception) -> MagicMock:
"""Return a mock of importlib.resources.files where read_bytes raises error."""
leaf = MagicMock()
leaf.read_bytes.side_effect = error
package = MagicMock()
package.__truediv__.return_value = leaf
return MagicMock(return_value=package)
def test_load_tzdata_returns_none_on_windows_einval() -> None:
"""On Windows, opening a tzdata path with NTFS-illegal chars raises OSError(EINVAL).
Regression test for crash when the system TZ resolves to a POSIX string like
"<+08>-8" (Asia/Shanghai, IST, etc.) and is fed back into _load_tzdata by
validate_tz to check whether it is also a valid IANA key.
"""
err = OSError(errno.EINVAL, "Invalid argument")
with patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(err),
):
assert _load_tzdata("<+08>-8") is None
def test_load_tzdata_propagates_unexpected_oserror() -> None:
"""Unrelated OSErrors (e.g. PermissionError) must not be swallowed."""
with (
patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(
PermissionError(errno.EACCES, "Permission denied")
),
),
pytest.raises(PermissionError),
):
_load_tzdata("Some/Zone")
def test_load_tzdata_returns_none_on_file_not_found() -> None:
"""Existing behavior: missing tz file returns None rather than raising."""
with patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(FileNotFoundError()),
):
assert _load_tzdata("Not/A/Zone") is None
def test_validate_tz_accepts_posix_string_when_read_bytes_raises_einval() -> None:
"""validate_tz must not crash when _load_tzdata hits the Windows EINVAL path.
Simulates the Windows case where the auto-detected POSIX TZ string is fed
back through _load_tzdata and the underlying read_bytes raises errno 22.
"""
with patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(OSError(errno.EINVAL, "Invalid argument")),
):
assert validate_tz("<+08>-8") == "<+08>-8"

View File

@@ -10,6 +10,7 @@ from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import config_validation as cv, core
from esphome.components.safe_mode import to_code as safe_mode_to_code
from esphome.const import (
CONF_AREA,
CONF_AREAS,
@@ -312,6 +313,75 @@ def test_add_platform_defines_priority() -> None:
)
def test_to_code_priority_above_safe_mode() -> None:
"""Test that core to_code emits the looping_components_ init before safe_mode.
Regression test for https://github.com/esphome/esphome/issues/16262.
safe_mode emits an `if (should_enter_safe_mode(...)) return;` line in main()
at APPLICATION priority. The `App.looping_components_.init(...)` call must be
emitted at a higher priority than APPLICATION so it lands in main() before
the early return; otherwise the FixedVector is never sized when safe mode is
active and loop() never runs (Wi-Fi never connects).
"""
assert config.to_code.priority > safe_mode_to_code.priority, (
f"core to_code priority ({config.to_code.priority}) must be greater than "
f"safe_mode to_code priority ({safe_mode_to_code.priority}) so that "
"App.looping_components_.init() is emitted before safe_mode's early return"
)
@pytest.mark.asyncio
async def test_add_looping_components_handles_empty_entries() -> None:
"""Test that _add_looping_components emits a valid constexpr when there are
no looping component entries.
With zero entries the generated constexpr must still be syntactically valid
C++ (`= 0;`), not an empty expression (`= ;`). This guards the empty-list
case that would otherwise produce uncompilable main.cpp output.
"""
CORE.data["looping_component_entries"] = []
await config._add_looping_components()
constexpr_lines = [
str(s)
for s in CORE.global_statements
if "ESPHOME_LOOPING_COMPONENT_COUNT" in str(s)
]
assert len(constexpr_lines) == 1
text = constexpr_lines[0]
assert "static constexpr size_t ESPHOME_LOOPING_COMPONENT_COUNT" in text
# The right-hand side must contain a literal `0`, not be empty.
rhs = text.split("=", 1)[1]
assert "0" in rhs
assert rhs.strip().rstrip(";").strip(), (
f"constexpr right-hand side must not be empty, got: {text!r}"
)
@pytest.mark.asyncio
async def test_add_looping_components_with_entries() -> None:
"""Test that _add_looping_components builds a HasLoopOverride sum from entries."""
CORE.data["looping_component_entries"] = [
"esphome::wifi::WiFiComponent",
"esphome::logger::Logger",
"esphome::wifi::WiFiComponent",
]
await config._add_looping_components()
constexpr_lines = [
str(s)
for s in CORE.global_statements
if "ESPHOME_LOOPING_COMPONENT_COUNT" in str(s)
]
assert len(constexpr_lines) == 1
text = constexpr_lines[0]
# Deduplicated by type, with per-type counts as multiplier.
assert "(2 * HasLoopOverride<esphome::wifi::WiFiComponent>::value)" in text
assert "(1 * HasLoopOverride<esphome::logger::Logger>::value)" in text
def test_valid_include_with_angle_brackets() -> None:
"""Test valid_include accepts angle bracket includes."""
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"

View File

@@ -0,0 +1,16 @@
substitutions:
device:
manufacturer: espressif
model: esp32
mac_suffix: ffffff
name: espressif-esp32-ffffff
network:
host: example.com
port: 8080
url: http://example.com:8080/api
esphome:
name: espressif-esp32-ffffff
test_list:
- espressif-esp32-ffffff
- http://example.com:8080/api
- espressif/esp32

View File

@@ -0,0 +1,18 @@
substitutions:
device:
manufacturer: "espressif"
model: "esp32"
mac_suffix: "ffffff"
name: ${device.manufacturer}-${device.model}-${device.mac_suffix}
network:
host: "example.com"
port: 8080
url: "http://${network.host}:${network.port}/api"
esphome:
name: ${device.name}
test_list:
- ${device.name}
- ${network.url}
- "${device.manufacturer}/${device.model}"

View File

@@ -170,6 +170,23 @@ def test_find_used_secret_keys_deduplicates(tmp_path: Path) -> None:
assert keys == {"key1"}
def test_find_used_secret_keys_quoted(tmp_path: Path) -> None:
"""Quoted !secret keys should resolve to the same key as unquoted form.
YAML strips surrounding quotes during parsing, so the secrets.yaml
lookup uses the unquoted key. The bundle scan must do the same.
"""
yaml1 = tmp_path / "a.yaml"
yaml1.write_text(
"single: !secret 'wifi_ssid'\n"
'double: !secret "wifi_pw"\n'
"bare: !secret api_key\n"
)
keys = _find_used_secret_keys([yaml1])
assert keys == {"wifi_ssid", "wifi_pw", "api_key"}
# ---------------------------------------------------------------------------
# _add_bytes_to_tar
# ---------------------------------------------------------------------------
@@ -1217,6 +1234,35 @@ def test_create_bundle_filters_secrets(tmp_path: Path) -> None:
assert "should_not_appear" not in secrets_data
def test_create_bundle_filters_secrets_quoted(tmp_path: Path) -> None:
"""Bundling must include secrets.yaml when !secret keys are quoted.
Regression test for issue 16259: quoted !secret references previously
captured the quotes as part of the key, so no key matched secrets.yaml
entries and the secrets file was dropped from the bundle entirely.
"""
config_dir = _setup_config_dir(tmp_path)
secrets = config_dir / "secrets.yaml"
secrets.write_text("ota_password: hunter2\nunused: should_not_appear\n")
config_yaml = "ota:\n password: !secret 'ota_password'\n"
(config_dir / "test.yaml").write_text(config_yaml)
creator = ConfigBundleCreator({})
result = creator.create_bundle()
assert result.manifest[ManifestKey.HAS_SECRETS] is True
buf = io.BytesIO(result.data)
with tarfile.open(fileobj=buf, mode="r:gz") as tar:
secrets_data = tar.extractfile("secrets.yaml").read().decode()
assert "ota_password" in secrets_data
assert "hunter2" in secrets_data
assert "unused" not in secrets_data
def test_create_bundle_no_secrets(tmp_path: Path) -> None:
_setup_config_dir(tmp_path)

View File

@@ -311,6 +311,105 @@ def test_run_platformio_cli_sets_environment_variables(
assert "arg" in args
@pytest.mark.parametrize(
("platform", "input_path", "expected"),
[
# win32: drive-letter extended-length prefix is stripped
(
"win32",
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# win32: UNC extended-length prefix is translated to a regular UNC path
(
"win32",
"\\\\?\\UNC\\server\\share\\python.exe",
"\\\\server\\share\\python.exe",
),
# win32: paths without the prefix are returned unchanged
(
"win32",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# non-win32: prefix is left alone (no-op)
("linux", "\\\\?\\C:\\python.exe", "\\\\?\\C:\\python.exe"),
("darwin", "/usr/bin/python3", "/usr/bin/python3"),
],
)
def test_strip_win_long_path_prefix(
platform: str, input_path: str, expected: str
) -> None:
r"""``\\?\`` and ``\\?\UNC\`` prefixes are stripped only on win32."""
with patch("esphome.platformio_api.sys.platform", platform):
assert platformio_api._strip_win_long_path_prefix(input_path) == expected
def test_run_platformio_cli_strips_win_long_path_prefix(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""Windows ``\\?\`` prefix on sys.executable does not leak into the subprocess.
The NSIS-installed esphome.exe launcher starts Python with
``sys.executable`` already prefixed by the extended-length path marker.
That prefix would otherwise propagate into PlatformIO's ``PYTHONEXE`` and
break SCons-emitted command lines run through ``cmd.exe``.
"""
CORE.build_path = str(setup_core / "build" / "test")
prefixed_exe = (
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
stripped_exe = (
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "win32"),
patch("esphome.platformio_api.sys.executable", prefixed_exe),
):
# Pop any pre-existing PYTHONEXEPATH so the assertion below reflects
# what run_platformio_cli set, not whatever the test runner's
# environment happened to contain.
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# The subprocess is invoked with the stripped executable path.
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == stripped_exe
# PYTHONEXEPATH is exported with the stripped path so PlatformIO's
# get_pythonexe_path() picks it up in the subprocess.
assert os.environ["PYTHONEXEPATH"] == stripped_exe
def test_run_platformio_cli_does_not_set_pythonexepath_without_strip(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""PYTHONEXEPATH is not touched when sys.executable has no ``\\?\`` prefix.
Setting it unconditionally would clobber a user-provided value (or
interfere with non-Windows tooling that has no prefix to strip).
"""
CORE.build_path = str(setup_core / "build" / "test")
plain_exe = "/usr/bin/python3"
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "linux"),
patch("esphome.platformio_api.sys.executable", plain_exe),
):
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == plain_exe
assert "PYTHONEXEPATH" not in os.environ
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:

View File

@@ -709,3 +709,119 @@ def test_detect_rp2040_bootsel_timeout() -> None:
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False
class TestSafePrint:
"""Tests for ``safe_print`` and its UnicodeEncodeError fallback chain."""
@pytest.fixture(autouse=True)
def _no_dashboard(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Default ``CORE.dashboard`` to False so each test starts hermetic."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", False)
def test_prints_plain_message(self, capsys: pytest.CaptureFixture[str]) -> None:
"""ASCII-only messages take the fast path through native ``print``."""
util.safe_print("hello world")
assert capsys.readouterr().out == "hello world\n"
def test_prints_unicode_on_utf8_stdout(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""Non-ASCII goes straight through when stdout can encode it."""
util.safe_print("bars: \u2582\u2584\u2586\u2588")
assert capsys.readouterr().out == "bars: \u2582\u2584\u2586\u2588\n"
def test_dashboard_escapes_esc_byte(
self,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
r"""Dashboard mode escapes raw ``\033`` ESC bytes to literal ``\\033``."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
util.safe_print("\033[0;32mhi\033[0m")
assert capsys.readouterr().out == "\\033[0;32mhi\\033[0m\n"
def test_fallback_writes_string_not_bytes_repr(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Regression: cp1252 fallback must produce a printable str, not ``b'...'``.
On Windows, when stdout is a redirected pipe (e.g. the dashboard),
Python uses cp1252, which cannot encode the wifi signal-bar block
characters (U+2582..U+2588). The previous fallback path called
``print(message.encode(...))`` with a ``bytes`` object, which
Python's ``print`` rendered as a literal ``b'...'`` repr — visible
in the user's dashboard output. The fix re-encodes through the
stream's encoding with ``backslashreplace`` and decodes back to
``str``.
"""
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("bars: \u2582\u2584\u2586\u2588 done")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Output is a clean line, not the bytes repr.
assert not output.startswith("b'")
assert "b'bars" not in output
# Unencodable codepoints become readable backslash escapes.
assert "\\u2582\\u2584\\u2586\\u2588" in output
# Encodable parts survive unchanged.
assert "bars: " in output
assert " done" in output
assert output.endswith("\n")
def test_fallback_with_dashboard_escaped_message(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Dashboard ESC escaping + cp1252 fallback compose correctly."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("\033[0;32m\u2582\u2584\u2586\u2588\033[0m")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Dashboard escaping turned ESC into literal "\033" (5 chars), which
# cp1252 can encode, so it survives the round-trip verbatim.
assert "\\033[0;32m" in output
assert "\\033[0m" in output
# Block characters became backslash escapes via backslashreplace.
assert "\\u2582\\u2584\\u2586\\u2588" in output
def test_final_message_when_locale_is_invalid(
self,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
"""If every encoding path fails, surface the locale-error sentinel."""
original_print = print
call_count = 0
def fake_print(*args: Any, **kwargs: Any) -> None:
nonlocal call_count
call_count += 1
# The first three calls are: native print, stream-encoding
# fallback, ASCII fallback. Make all three raise so we reach
# the final sentinel "Cannot print line..." which is expected
# to succeed (no encoding required).
if call_count <= 3:
raise UnicodeEncodeError("ascii", "x", 0, 1, "boom")
original_print(*args, **kwargs)
monkeypatch.setattr("builtins.print", fake_print)
util.safe_print("x")
assert call_count == 4
assert (
capsys.readouterr().out == "Cannot print line because of invalid locale!\n"
)