Compare commits

...

32 Commits

Author SHA1 Message Date
Jesse Hills
6fda5f41b2 Merge pull request #16240 from esphome/bump-2026.4.4
2026.4.4
2026-05-05 14:21:38 +12:00
Jesse Hills
197d4dac8e Bump version to 2026.4.4 2026-05-05 08:27:10 +12:00
Jesse Hills
2d7f9dc48d [api] Use safe_print for log output and fix safe_print bytes-repr fallback (#16160) 2026-05-05 08:27:04 +12:00
J. Nick Koston
be84e6c9f4 [api] Fall back to owning types for service array args used after a delay (#16140) 2026-05-05 08:22:05 +12:00
J. Nick Koston
0418f2138a [esp32] Drop printf wrap on IDF 6.0+ (picolibc no longer needs it) (#16189) 2026-05-05 08:22:05 +12:00
Clyde Stubbs
d9c22d6b56 [lvgl] Clamp values for meter line indicators (#16180) 2026-05-05 08:22:05 +12:00
J. Nick Koston
60a94fd109 [esp32] Replace 512B stack buffer in printf wraps with picolibc cookie FILE (#16170) 2026-05-05 08:22:05 +12:00
Jesse Hills
9371ec319a [core] Strip \\?\ prefix from sys.executable for PlatformIO subprocess (#16158) 2026-05-05 08:21:58 +12:00
J. Nick Koston
ce466c6b60 [mcp23xxx_base] Reject unsupported interrupt_pin options (inverted, allow_other_uses) (#16149) 2026-05-05 08:14:03 +12:00
Brandon Harvey
a460f5343c [automation] Fix codegen type for component.resume update_interval (#16069)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 08:14:02 +12:00
Jesse Hills
4e0509435a Merge pull request #16067 from esphome/bump-2026.4.3
2026.4.3
2026-04-28 15:39:24 +12:00
Jesse Hills
95b5ab7e78 Bump version to 2026.4.3 2026-04-28 12:58:29 +12:00
J. Nick Koston
3ac0939f55 [image] Fix RGB565+alpha rendering for multi-frame animations (#16017)
Co-authored-by: Claude <noreply@anthropic.com>
2026-04-28 12:58:29 +12:00
Jesse Hills
191d3bc7e4 [esp32_touch] Feed wdt (#16066) 2026-04-28 12:58:29 +12:00
Edward Firmo
a186f6fea9 [nextion] Unify TFT upload ack timeout to 5000ms (#15960) 2026-04-28 12:58:29 +12:00
Mat931
aea88aef5e [esp32][wifi] Fix bootloop and WiFi connection issue if nvs partition is missing or has non-default label (#16025)
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
2026-04-28 12:58:29 +12:00
J. Nick Koston
433bbdb016 [rotary_encoder][at581x] Fix templatable int field types (#16015) 2026-04-28 12:58:29 +12:00
J. Nick Koston
4137d93cbf [wifi] Fix stale wifi.connected after state transition (#15966) 2026-04-28 12:58:29 +12:00
J. Nick Koston
6a5919ee87 [deep_sleep] Fix sleep_duration codegen type to uint32_t (#15965) 2026-04-28 12:58:29 +12:00
Jesse Hills
b753ee4e94 [time] Handle Windows EINVAL when validating POSIX TZ strings (#15934) 2026-04-28 12:58:29 +12:00
Clyde Stubbs
c26ea52620 [lvgl] Triggers on tabview tabs fix (#15935) 2026-04-28 12:58:29 +12:00
Jesse Hills
6ca5b31fab Merge pull request #15933 from esphome/bump-2026.4.2
2026.4.2
2026-04-23 14:10:10 +12:00
Jesse Hills
00b71208a6 Bump version to 2026.4.2 2026-04-23 11:18:39 +12:00
Keith Burzinski
76eb8f697f [usb_uart] Derive TX output chunk count from buffer_size config (#15909) 2026-04-23 11:18:39 +12:00
Jonathan Swoboda
2a3bd8bc85 [io_expanders] Self-heal interrupt-driven expanders when INT stays asserted across the read (#15923) 2026-04-23 11:18:39 +12:00
Keith Burzinski
629da4d878 [esp32] Add Secure Boot V1 ECDSA signing scheme for pre-rev-3.0 ESP32 (#15882) 2026-04-23 11:18:39 +12:00
Jonathan Swoboda
5c2ceb63e0 [ld2412] Fix null deref in set_basic_config when entities unconfigured (#15893) 2026-04-23 11:18:39 +12:00
Jonathan Swoboda
92cb6dd7fd [core] Fix Pvariable placement new losing subclass identity (#15881) 2026-04-23 11:18:39 +12:00
Jonathan Swoboda
06e5931ad7 [image] Fix rodata bloat for multi-frame RGB565+alpha animations (#15873) 2026-04-23 11:18:39 +12:00
Clyde Stubbs
dc5b06285d [lvgl] Fix update of textarea attached to keyboard (#15866) 2026-04-23 11:18:38 +12:00
Clyde Stubbs
3d0a2421a6 [lvgl] Fix overloads for setting images on styles (#15864) 2026-04-23 11:18:38 +12:00
Clyde Stubbs
22f6791dea [lvgl] Fix format of hello world page (#15868) 2026-04-23 11:18:38 +12:00
54 changed files with 1018 additions and 155 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2026.4.1
PROJECT_NUMBER = 2026.4.4
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -598,7 +598,7 @@ async def component_resume_action_to_code(
comp = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, comp)
if CONF_UPDATE_INTERVAL in config:
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int)
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, cg.uint32)
cg.add(var.set_update_interval(template_))
return var

View File

@@ -62,7 +62,12 @@ void Animation::set_frame(int frame) {
}
void Animation::update_data_start_() {
const uint32_t image_size = this->get_width_stride() * this->height_;
uint32_t image_size = this->get_width_stride() * this->height_;
// RGB565 with an alpha channel stores the alpha plane immediately after the RGB
// plane within each frame, so the per-frame stride includes the alpha bytes.
if (this->type_ == image::IMAGE_TYPE_RGB565 && this->transparency_ == image::TRANSPARENCY_ALPHA_CHANNEL) {
image_size += static_cast<uint32_t>(this->width_) * this->height_;
}
this->data_start_ = this->animation_data_start_ + image_size * this->current_frame_;
}

View File

@@ -72,17 +72,35 @@ APIUnregisterServiceCallAction = api_ns.class_(
UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger)
ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument")
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Owning element type for each YAML service variable type. Used to derive both
# the zero-copy native types and the owning fallback types below.
_SERVICE_ARG_SCALAR_TYPES: dict[str, MockObj] = {
"bool": cg.bool_,
"int": cg.int32,
"float": cg.float_,
"string": cg.std_string,
}
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Scalars are passed by value; string uses a non-owning view into rx_buf_.
**_SERVICE_ARG_SCALAR_TYPES,
"string": cg.StringRef,
"bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"),
"int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"),
"float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"),
"string[]": cg.FixedVector.template(cg.std_string)
.operator("const")
.operator("ref"),
# Arrays are passed as non-owning const references into rx_buf_.
**{
f"{name}[]": cg.FixedVector.template(t).operator("const").operator("ref")
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
# Owning fallback types used when the action chain contains non-synchronous actions
# (delay, wait_until, script.wait, etc.). The default non-owning types reference
# storage in the receive buffer, which is reused once the synchronous portion of
# the chain returns. FixedVector is also non-copyable, so the deferred lambda
# capture in DelayAction::play_complex would fail to compile.
SERVICE_ARG_FALLBACK_TYPES: dict[str, MockObj] = {
"string": cg.std_string,
**{
f"{name}[]": cg.std_vector.template(t)
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
CONF_ENCRYPTION = "encryption"
CONF_BATCH_DELAY = "batch_delay"
@@ -382,17 +400,20 @@ async def to_code(config: ConfigType) -> None:
func_args.append((cg.bool_, "return_response"))
# Check if action chain has non-synchronous actions that would make
# non-owning StringRef dangle (rx_buf_ reused after delay)
# non-owning args (StringRef, const FixedVector&) dangle once the
# rx_buf_ is reused after a delay/wait_until/script.wait/etc. The
# FixedVector references would also fail to compile because they
# are non-copyable and DelayAction captures args by value.
has_non_synchronous = automation.has_non_synchronous_actions(
conf.get(CONF_THEN, [])
)
service_arg_names: list[str] = []
for name, var_ in conf[CONF_VARIABLES].items():
native = SERVICE_ARG_NATIVE_TYPES[var_]
# Fall back to std::string for string args if non-synchronous actions exist
if has_non_synchronous and native is cg.StringRef:
native = cg.std_string
if has_non_synchronous and var_ in SERVICE_ARG_FALLBACK_TYPES:
native = SERVICE_ARG_FALLBACK_TYPES[var_]
else:
native = SERVICE_ARG_NATIVE_TYPES[var_]
service_template_args.append(native)
func_args.append((native, name))
service_arg_names.append(name)

View File

@@ -20,6 +20,7 @@ import contextlib
from esphome.const import CONF_KEY, CONF_PORT, __version__
from esphome.core import CORE
from esphome.platformio_api import process_stacktrace
from esphome.util import safe_print
from . import CONF_ENCRYPTION
@@ -60,7 +61,6 @@ async def async_run_logs(
noise_psk=noise_psk,
addresses=addresses, # Pass all addresses for automatic retry
)
dashboard = CORE.dashboard
backtrace_state = False
# Try platform-specific stacktrace handler first, fall back to generic
@@ -82,7 +82,11 @@ async def async_run_logs(
f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
)
for parsed_msg in parse_log_message(text, timestamp):
print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg)
# safe_print handles the dashboard \033 escaping and falls back
# to backslashreplace encoding on stdouts that can't represent
# the wifi signal-bar block characters (Windows redirected
# cp1252 pipe).
safe_print(parsed_msg)
for raw_line in text.splitlines():
if platform_process_stacktrace:
backtrace_state = platform_process_stacktrace(

View File

@@ -183,19 +183,19 @@ async def at581x_settings_to_code(config, action_id, template_arg, args):
cg.add(var.set_sensing_distance(template_))
if selfcheck := config.get(CONF_POWERON_SELFCHECK_TIME):
template_ = await cg.templatable(selfcheck, args, cg.int32)
template_ = await cg.templatable(selfcheck, args, cg.int_)
cg.add(var.set_poweron_selfcheck_time(template_))
if protect := config.get(CONF_PROTECT_TIME):
template_ = await cg.templatable(protect, args, cg.int32)
template_ = await cg.templatable(protect, args, cg.int_)
cg.add(var.set_protect_time(template_))
if trig_base := config.get(CONF_TRIGGER_BASE):
template_ = await cg.templatable(trig_base, args, cg.int32)
template_ = await cg.templatable(trig_base, args, cg.int_)
cg.add(var.set_trigger_base(template_))
if trig_keep := config.get(CONF_TRIGGER_KEEP):
template_ = await cg.templatable(trig_keep, args, cg.int32)
template_ = await cg.templatable(trig_keep, args, cg.int_)
cg.add(var.set_trigger_keep(template_))
if (stage_gain := config.get(CONF_STAGE_GAIN)) is not None:

View File

@@ -413,7 +413,7 @@ async def deep_sleep_enter_to_code(config, action_id, template_arg, args):
paren = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, paren)
if CONF_SLEEP_DURATION in config:
template_ = await cg.templatable(config[CONF_SLEEP_DURATION], args, cg.int32)
template_ = await cg.templatable(config[CONF_SLEEP_DURATION], args, cg.uint32)
cg.add(var.set_sleep_duration(template_))
if CONF_UNTIL in config:

View File

@@ -128,23 +128,30 @@ ASSERTION_LEVELS = {
SIGNING_SCHEMES = {
"rsa3072": "CONFIG_SECURE_SIGNED_APPS_RSA_SCHEME",
"ecdsa256": "CONFIG_SECURE_SIGNED_APPS_ECDSA_V2_SCHEME",
"ecdsa_v1": "CONFIG_SECURE_SIGNED_APPS_ECDSA_SCHEME",
}
# Chip variants that only support one signing scheme for Secure Boot V2.
# Chip variants that only support one V2 signing scheme.
# Based on SOC_SECURE_BOOT_V2_RSA / SOC_SECURE_BOOT_V2_ECC in soc_caps.h.
# Variants not listed in either set support both RSA and ECDSA
# Variants not listed in either set support both RSA and ECDSA V2
# (e.g. C5, C6, H2, P4). New variants should be added to the
# appropriate set if they only support one scheme.
SIGNED_OTA_RSA_ONLY_VARIANTS = {
VARIANT_ESP32,
# Note: VARIANT_ESP32 is not listed here because it supports V2 RSA only
# when minimum_chip_revision >= 3.0, which requires special handling.
SIGNED_OTA_V2_RSA_ONLY_VARIANTS = {
VARIANT_ESP32S2,
VARIANT_ESP32S3,
VARIANT_ESP32C3,
}
SIGNED_OTA_ECC_ONLY_VARIANTS = {
SIGNED_OTA_V2_ECC_ONLY_VARIANTS = {
VARIANT_ESP32C2,
VARIANT_ESP32C61,
}
# V1 ECDSA (Secure Boot V1) is only supported on the original ESP32.
# Based on SOC_SECURE_BOOT_V1 in soc_caps.h.
SIGNED_OTA_V1_ECDSA_VARIANTS = {
VARIANT_ESP32,
}
COMPILER_OPTIMIZATIONS = {
"DEBUG": "CONFIG_COMPILER_OPTIMIZATION_DEBUG",
@@ -991,25 +998,73 @@ def final_validate(config):
if signed_ota := advanced.get(CONF_SIGNED_OTA_VERIFICATION):
scheme = signed_ota[CONF_SIGNING_SCHEME]
variant = config[CONF_VARIANT]
scheme_variant_conflicts = {
"ecdsa256": (SIGNED_OTA_RSA_ONLY_VARIANTS, "rsa3072"),
"rsa3072": (SIGNED_OTA_ECC_ONLY_VARIANTS, "ecdsa256"),
}
if (conflict := scheme_variant_conflicts.get(scheme)) and variant in conflict[
0
]:
min_rev = advanced.get(CONF_MINIMUM_CHIP_REVISION)
scheme_path = [
CONF_FRAMEWORK,
CONF_ADVANCED,
CONF_SIGNED_OTA_VERIFICATION,
CONF_SIGNING_SCHEME,
]
# V1 ECDSA is only available on the original ESP32
if scheme == "ecdsa_v1" and variant not in SIGNED_OTA_V1_ECDSA_VARIANTS:
errs.append(
cv.Invalid(
f"Signing scheme '{scheme}' is not supported on "
f"{VARIANT_FRIENDLY[variant]}. Use '{conflict[1]}' instead.",
path=[
CONF_FRAMEWORK,
CONF_ADVANCED,
CONF_SIGNED_OTA_VERIFICATION,
CONF_SIGNING_SCHEME,
],
f"Signing scheme 'ecdsa_v1' is only supported on "
f"{VARIANT_FRIENDLY[VARIANT_ESP32]}. "
f"Use 'rsa3072' or 'ecdsa256' instead.",
path=scheme_path,
)
)
elif variant == VARIANT_ESP32:
# On ESP32, V2 RSA requires minimum_chip_revision >= 3.0
# Note: string comparison works here because cv.one_of constrains
# min_rev to known ESP32_CHIP_REVISIONS values ("0.0".."3.1").
if scheme == "rsa3072" and (min_rev is None or min_rev < "3.0"):
errs.append(
cv.Invalid(
f"Signing scheme 'rsa3072' on {VARIANT_FRIENDLY[variant]} "
f"requires minimum_chip_revision: '3.0' or higher "
f"(Secure Boot V2 RSA needs chip revision 3.0+). "
f"For older chip revisions, use 'ecdsa_v1' instead.",
path=scheme_path,
)
)
# ESP32 does not support V2 ECDSA (no SOC_SECURE_BOOT_V2_ECC)
elif scheme == "ecdsa256":
errs.append(
cv.Invalid(
f"Signing scheme 'ecdsa256' is not supported on "
f"{VARIANT_FRIENDLY[variant]}. Use 'rsa3072' (with "
f"minimum_chip_revision: '3.0') or 'ecdsa_v1' instead.",
path=scheme_path,
)
)
# V1 on rev 3.0+ -- suggest V2 RSA for stronger security
elif scheme == "ecdsa_v1" and min_rev is not None and min_rev >= "3.0":
_LOGGER.info(
"Using Secure Boot V1 ECDSA on %s rev %s. "
"Consider using 'rsa3072' (Secure Boot V2 RSA) for "
"stronger security on chip revision 3.0+.",
VARIANT_FRIENDLY[variant],
min_rev,
)
else:
# Non-ESP32 variants: check V2 scheme-variant compatibility
scheme_variant_conflicts = {
"ecdsa256": (SIGNED_OTA_V2_RSA_ONLY_VARIANTS, "rsa3072"),
"rsa3072": (SIGNED_OTA_V2_ECC_ONLY_VARIANTS, "ecdsa256"),
}
if (
conflict := scheme_variant_conflicts.get(scheme)
) and variant in conflict[0]:
errs.append(
cv.Invalid(
f"Signing scheme '{scheme}' is not supported on "
f"{VARIANT_FRIENDLY[variant]}. Use '{conflict[1]}' instead.",
path=scheme_path,
)
)
if CONF_OTA not in full_config:
_LOGGER.warning(
"Signed OTA verification is enabled but no OTA component is configured. "
@@ -1685,7 +1740,17 @@ async def to_code(config):
# Wrap FILE*-based printf functions to eliminate newlib's _vfprintf_r
# (~11 KB). See printf_stubs.cpp for implementation.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF]:
#
# The wrap is only beneficial against newlib. Picolibc's tinystdio
# implements vsnprintf by building a string-output FILE and calling
# vfprintf, so vfprintf is unconditionally linked in by any caller
# of snprintf/vsnprintf — effectively every build — and the wrap
# saves nothing while costing ~170 B of shim. IDF 5.x defaults to
# newlib on every variant; IDF 6.0+ switches to picolibc on every
# variant.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF] or idf_version() >= cv.Version(
6, 0, 0
):
cg.add_define("USE_FULL_PRINTF")
else:
for symbol in ("vprintf", "printf", "fprintf", "vfprintf"):

View File

@@ -5,6 +5,7 @@ import json # noqa: E402
import os # noqa: E402
import pathlib # noqa: E402
import shutil # noqa: E402
import subprocess # noqa: E402
from glob import glob # noqa: E402
@@ -25,6 +26,114 @@ def _parse_sdkconfig(sdkconfig_path):
return options
def _generate_v1_verification_key(env):
"""Generate the V1 ECDSA verification key binary and assembly source file.
Secure Boot V1 embeds the public verification key directly in the app binary
as a compiled object (via a .S assembly file). The ESP-IDF CMake build generates
these files via custom commands, but PlatformIO's SCons bridge does not execute
them. This function replicates that logic:
1. Extracts the raw public key from the PEM signing key using espsecure.
2. Generates the .S assembly source that embeds the key bytes.
"""
build_dir = pathlib.Path(env.subst("$BUILD_DIR"))
project_dir = pathlib.Path(env.subst("$PROJECT_DIR"))
pioenv = env.subst("$PIOENV")
sdkconfig = _parse_sdkconfig(project_dir / f"sdkconfig.{pioenv}")
if sdkconfig.get("CONFIG_SECURE_SIGNED_APPS_ECDSA_SCHEME") != "y":
return
bin_path = build_dir / "signature_verification_key.bin"
asm_path = build_dir / "signature_verification_key.bin.S"
# Determine the source of the verification key
if sdkconfig.get("CONFIG_SECURE_BOOT_BUILD_SIGNED_BINARIES") == "y":
# Extract public key from the signing key
signing_key = sdkconfig.get("CONFIG_SECURE_BOOT_SIGNING_KEY")
if not signing_key:
return
signing_key_path = pathlib.Path(signing_key)
if not signing_key_path.exists():
print(f"Error: V1 ECDSA signing key not found: {signing_key_path}")
env.Exit(1)
return
if not bin_path.exists() or bin_path.stat().st_mtime < signing_key_path.stat().st_mtime:
python_exe = env.subst("$PYTHONEXE")
result = subprocess.run(
[python_exe, "-m", "espsecure", "extract_public_key",
"--keyfile", str(signing_key_path), str(bin_path)],
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Error extracting V1 verification key: {result.stderr}")
env.Exit(1)
return
print(f"Extracted V1 ECDSA verification key from {signing_key_path.name}")
else:
# User-provided verification key -- should already be a raw binary file
verification_key = sdkconfig.get("CONFIG_SECURE_BOOT_VERIFICATION_KEY")
if not verification_key:
return
verification_key_path = pathlib.Path(verification_key)
if not verification_key_path.exists():
print(f"Error: Verification key not found: {verification_key_path}")
env.Exit(1)
return
shutil.copyfile(str(verification_key_path), str(bin_path))
if not bin_path.exists():
return
# Generate the .S assembly file from the binary key data.
# Replicates ESP-IDF's data_file_embed_asm.cmake with RENAME_TO=signature_verification_key_bin.
# The file is needed in both the app build dir and the bootloader build dir, since
# the bootloader also embeds the verification key when CONFIG_SECURE_SIGNED_ON_BOOT_NO_SECURE_BOOT
# is enabled. PlatformIO's SCons bridge does not execute the CMake custom commands that
# normally generate these files.
data = bin_path.read_bytes()
varname = "signature_verification_key_bin"
lines = []
lines.append(f"/* Data converted from {bin_path.name} */")
lines.append(".data")
lines.append("#if !defined (__APPLE__) && !defined (__linux__)")
lines.append(".section .rodata.embedded")
lines.append("#endif")
lines.append(f"\n.global {varname}")
lines.append(f"{varname}:")
lines.append(f"\n.global _binary_{varname}_start")
lines.append(f"_binary_{varname}_start: /* for objcopy compatibility */")
# Format binary data as .byte lines (16 bytes per line)
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_bytes = ", ".join(f"0x{b:02x}" for b in chunk)
lines.append(f".byte {hex_bytes}")
lines.append(f"\n.global _binary_{varname}_end")
lines.append(f"_binary_{varname}_end: /* for objcopy compatibility */")
lines.append(f"\n.global {varname}_length")
lines.append(f"{varname}_length:")
lines.append(f".long {len(data)}")
lines.append("")
lines.append('#if defined (__linux__)')
lines.append('.section .note.GNU-stack,"",@progbits')
lines.append("#endif")
asm_content = "\n".join(lines) + "\n"
# Write to app build dir and bootloader build dir
asm_path.write_text(asm_content)
bootloader_dir = build_dir / "bootloader"
if bootloader_dir.is_dir():
bootloader_bin = bootloader_dir / "signature_verification_key.bin"
bootloader_asm = bootloader_dir / "signature_verification_key.bin.S"
shutil.copyfile(str(bin_path), str(bootloader_bin))
bootloader_asm.write_text(asm_content)
def sign_firmware(source, target, env):
"""
Sign the firmware binary using espsecure.py if signed OTA verification is enabled.
@@ -55,9 +164,12 @@ def sign_firmware(source, target, env):
env.Exit(1)
return
# ESPHome only exposes RSA3072 and ECDSA256 (both Secure Boot V2 schemes),
# so the espsecure signature version is always 2.
sign_version = "2"
# Determine espsecure signature version from the signing scheme:
# V1 ECDSA (Secure Boot V1) uses --version 1, V2 RSA/ECDSA use --version 2.
if sdkconfig.get("CONFIG_SECURE_SIGNED_APPS_ECDSA_SCHEME") == "y":
sign_version = "1"
else:
sign_version = "2"
firmware_name = os.path.basename(env.subst("$PROGNAME")) + ".bin"
firmware_path = build_dir / firmware_name
@@ -217,6 +329,11 @@ def esp32_copy_ota_bin(source, target, env):
print(f"Copied firmware to {new_file_name}")
# Generate V1 ECDSA verification key files before build starts.
# Workaround for PlatformIO not executing CMake custom commands that extract
# the public key and generate the .S assembly file for Secure Boot V1.
_generate_v1_verification_key(env) # noqa: F821
# Run signing first, then merge, then ota copy
env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", sign_firmware) # noqa: F821
env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", merge_factory_bin) # noqa: F821

View File

@@ -22,6 +22,12 @@ struct NVSData {
static std::vector<NVSData> s_pending_save; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
// open() runs from app_main() before the logger is initialized, so any failure
// must be deferred until after global_logger is set. This is emitted from the
// first make_preference() call, which runs from the generated setup() after
// log->pre_setup() has run at EARLY_INIT priority.
static esp_err_t s_open_err = ESP_OK; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
bool ESP32PreferenceBackend::save(const uint8_t *data, size_t len) {
// try find in pending saves and update that
for (auto &obj : s_pending_save) {
@@ -74,12 +80,14 @@ bool ESP32PreferenceBackend::load(uint8_t *data, size_t len) {
}
void ESP32Preferences::open() {
// Runs from app_main() before the logger is initialized; any logging here
// must be deferred. See s_open_err and make_preference() below.
nvs_flash_init();
esp_err_t err = nvs_open("esphome", NVS_READWRITE, &this->nvs_handle);
if (err == 0)
return;
ESP_LOGW(TAG, "nvs_open failed: %s - erasing NVS", esp_err_to_name(err));
s_open_err = err;
nvs_flash_deinit();
nvs_flash_erase();
nvs_flash_init();
@@ -91,6 +99,14 @@ void ESP32Preferences::open() {
}
ESPPreferenceObject ESP32Preferences::make_preference(size_t length, uint32_t type) {
if (s_open_err != ESP_OK) {
if (this->nvs_handle == 0) {
ESP_LOGW(TAG, "nvs_open failed: %s - NVS unavailable", esp_err_to_name(s_open_err));
} else {
ESP_LOGW(TAG, "nvs_open failed: %s - erased NVS", esp_err_to_name(s_open_err));
}
s_open_err = ESP_OK;
}
auto *pref = new ESP32PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
pref->nvs_handle = this->nvs_handle;
pref->key = type;

View File

@@ -1,32 +1,38 @@
/*
* Linker wrap stubs for FILE*-based printf functions.
* Linker wrap stubs for FILE*-based printf functions (newlib only).
*
* ESP-IDF SDK components (gpio driver, ringbuf, log_write) reference
* fprintf(), printf(), vprintf(), and vfprintf() which pull in the full
* printf implementation (~11 KB on newlib's _vfprintf_r, ~2.8 KB on
* picolibc's vfprintf). This is a separate implementation from the one
* used by snprintf/vsnprintf that handles FILE* stream I/O with buffering
* and locking.
* fprintf(), printf(), vprintf(), and vfprintf(), which on newlib pull
* in _vfprintf_r (~11 KB) — a separate implementation from the one used
* by snprintf/vsnprintf that handles FILE* stream I/O with buffering.
*
* ESPHome replaces the ESP-IDF log handler via esp_log_set_vprintf_(),
* so the SDK's vprintf() path is dead code at runtime. The fprintf()
* and printf() calls in SDK components are only in debug/assert paths
* (gpio_dump_io_configuration, ringbuf diagnostics) that are either
* GC'd or never called. Crash backtraces and panic output are
* unaffected they use esp_rom_printf() which is a ROM function
* and does not go through libc.
* unaffected; they use esp_rom_printf() which is a ROM function and
* does not go through libc.
*
* These stubs redirect through vsnprintf() (which uses _svfprintf_r
* already in the binary) and fwrite(), allowing the linker to
* dead-code eliminate _vfprintf_r.
* This wrap is newlib-only. On picolibc, vsnprintf is implemented as
* vfprintf into a string-output FILE, so vfprintf is unconditionally
* linked in by any caller of snprintf/vsnprintf and the wrap can never
* elide it — it just adds shim cost. Codegen forces USE_FULL_PRINTF
* on picolibc builds (IDF 6.0+ on all variants) so this file compiles
* to nothing there; the #error below catches a desynchronised gate.
*
* Saves ~11 KB of flash.
* Saves ~11 KB of flash on newlib.
*
* To disable these wraps, set enable_full_printf: true in the esp32
* advanced config section.
* To disable this wrap on newlib, set enable_full_printf: true in the
* esp32 advanced config section.
*/
#if defined(USE_ESP_IDF) && !defined(USE_FULL_PRINTF)
#ifdef __PICOLIBC__
#error "printf wrap is net-negative on picolibc; codegen should set USE_FULL_PRINTF"
#endif
#include <cstdarg>
#include <cstdio>
@@ -34,6 +40,9 @@
namespace esphome::esp32 {}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
static constexpr size_t PRINTF_BUFFER_SIZE = 512;
// These stubs are essentially dead code at runtime — ESPHome replaces the
@@ -55,14 +64,16 @@ static int write_printf_buffer(FILE *stream, char *buf, int len) {
return len;
}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
int __wrap_vprintf(const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stdout, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_printf(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
@@ -71,11 +82,6 @@ int __wrap_printf(const char *fmt, ...) {
return len;
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_fprintf(FILE *stream, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);

View File

@@ -216,6 +216,7 @@ void ESP32TouchComponent::setup() {
// Do initial oneshot scans to populate baseline values
for (uint32_t i = 0; i < ONESHOT_SCAN_COUNT; i++) {
err = touch_sensor_trigger_oneshot_scanning(this->sens_handle_, ONESHOT_SCAN_TIMEOUT_MS);
App.feed_wdt(); // 3 scans with 2s timeout might exceed WDT, so feed it here to be safe
if (err != ESP_OK) {
ESP_LOGW(TAG, "Oneshot scan %" PRIu32 " failed: %s", i, esp_err_to_name(err));
}

View File

@@ -744,21 +744,28 @@ async def write_image(config, all_frames=False):
if frame_count <= 1:
_LOGGER.warning("Image file %s has no animation frames", path)
total_rows = height * frame_count
encoder = IMAGE_TYPE[type](width, total_rows, transparency, dither, invert_alpha)
if byte_order := config.get(CONF_BYTE_ORDER):
# Check for valid type has already been done in validate_settings
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
# Encode each frame with its own encoder and concatenate. This keeps every
# frame self-contained on disk (e.g. RGB565+alpha emits [RGB plane | alpha plane]
# per frame) so animation frame stepping in image.cpp / animation.cpp stays
# correct without needing to know the total frame count.
byte_order = config.get(CONF_BYTE_ORDER)
combined_data: list[int] = []
encoder: ImageEncoder | None = None
for frame_index in range(frame_count):
image.seek(frame_index)
encoder = IMAGE_TYPE[type](width, height, transparency, dither, invert_alpha)
if byte_order is not None:
# Check for valid type has already been done in validate_settings
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
pixels = encoder.convert(image.resize((width, height)), path).getdata()
for row in range(height):
for col in range(width):
encoder.encode(pixels[row * width + col])
encoder.end_row()
encoder.end_image()
combined_data.extend(encoder.data)
rhs = [HexInt(x) for x in encoder.data]
rhs = [HexInt(x) for x in combined_data]
prog_arr = cg.progmem_array(config[CONF_RAW_DATA_ID], rhs)
image_type = get_image_type_enum(type)
trans_value = get_transparency_enum(encoder.transparency)

View File

@@ -766,32 +766,38 @@ void LD2412Component::get_distance_resolution_() { this->send_command_(CMD_QUERY
void LD2412Component::query_light_control_() { this->send_command_(CMD_QUERY_LIGHT_CONTROL, nullptr, 0); }
void LD2412Component::set_basic_config() {
uint8_t min_gate = 1;
uint8_t max_gate = TOTAL_GATES;
uint16_t timeout = DEFAULT_PRESENCE_TIMEOUT;
uint8_t out_pin_level = 0x01;
#ifdef USE_NUMBER
if (!this->min_distance_gate_number_->has_state() || !this->max_distance_gate_number_->has_state() ||
!this->timeout_number_->has_state()) {
return;
if (this->min_distance_gate_number_ != nullptr) {
if (!this->min_distance_gate_number_->has_state())
return;
min_gate = static_cast<int>(this->min_distance_gate_number_->state);
}
if (this->max_distance_gate_number_ != nullptr) {
if (!this->max_distance_gate_number_->has_state())
return;
max_gate = static_cast<int>(this->max_distance_gate_number_->state) + 1;
}
if (this->timeout_number_ != nullptr) {
if (!this->timeout_number_->has_state())
return;
timeout = static_cast<int>(this->timeout_number_->state);
}
#endif
#ifdef USE_SELECT
if (!this->out_pin_level_select_->has_state()) {
return;
if (this->out_pin_level_select_ != nullptr) {
if (!this->out_pin_level_select_->has_state())
return;
out_pin_level = find_uint8(OUT_PIN_LEVELS_BY_STR, this->out_pin_level_select_->current_option().c_str());
}
#endif
uint8_t value[5] = {
#ifdef USE_NUMBER
lowbyte(static_cast<int>(this->min_distance_gate_number_->state)),
lowbyte(static_cast<int>(this->max_distance_gate_number_->state) + 1),
lowbyte(static_cast<int>(this->timeout_number_->state)),
highbyte(static_cast<int>(this->timeout_number_->state)),
#else
1, TOTAL_GATES, DEFAULT_PRESENCE_TIMEOUT, 0,
#endif
#ifdef USE_SELECT
find_uint8(OUT_PIN_LEVELS_BY_STR, this->out_pin_level_select_->current_option().c_str()),
#else
0x01, // Default value if not using select
#endif
lowbyte(min_gate), lowbyte(max_gate), lowbyte(timeout), highbyte(timeout), out_pin_level,
};
this->set_config_mode_(true);
this->send_command_(CMD_BASIC_CONF, value, sizeof(value));

View File

@@ -89,10 +89,12 @@
id: hello_world_label_
text: "Hello World!"
align: center
- obj:
- container:
id: hello_world_qrcode_
outline_width: 0
border_width: 0
height: 100
width: 100
hidden: !lambda |-
return lv_obj_get_width(lv_screen_active()) < 300 && lv_obj_get_height(lv_screen_active()) < 400;
widgets:

View File

@@ -454,10 +454,12 @@ void LVTouchListener::update(const touchscreen::TouchPoints_t &tpoints) {
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value) {
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value) {
auto *scale = lv_obj_get_parent(obj);
auto min_value = lv_scale_get_range_min_value(scale);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (lv_scale_get_range_max_value(scale) - min_value) +
auto max_value = lv_scale_get_range_max_value(scale);
value = clamp(value, min_value, max_value);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (max_value - min_value) +
lv_scale_get_rotation((scale))) %
360;
}

View File

@@ -88,6 +88,12 @@ inline void lv_obj_set_style_bitmap_mask_src(lv_obj_t *obj, image::Image *image,
inline void lv_obj_set_style_bg_image_src(lv_obj_t *obj, image::Image *image, lv_style_selector_t selector) {
::lv_obj_set_style_bg_image_src(obj, image->get_lv_image_dsc(), selector);
}
inline void lv_style_set_bg_image_src(lv_style_t *style, image::Image *image) {
::lv_style_set_bg_image_src(style, image->get_lv_image_dsc());
}
inline void lv_style_set_bitmap_mask_src(lv_style_t *style, image::Image *image) {
::lv_style_set_bitmap_mask_src(style, image->get_lv_image_dsc());
}
#endif // USE_LVGL_IMAGE
#ifdef USE_LVGL_ANIMIMG
inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images) {
@@ -106,7 +112,7 @@ inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images
#endif // USE_LVGL_ANIMIMG
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value);
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value);
#endif
// Parent class for things that wrap an LVGL object

View File

@@ -52,19 +52,23 @@ class KeyboardType(WidgetType):
if mode := config.get(CONF_MODE):
await w.set_property(CONF_MODE, await KEYBOARD_MODES.process(mode))
if textarea := config.get(CONF_TEXTAREA):
# If a textarea is configured, it must be generated before the keyboard can attach it.
# If not yet configured, defer the attachment code.
if not is_widget_completed(textarea):
# Can only happen for an initial config, where the keyboard is configured before the
# textarea, so it's ok to always emit into the global context
async def add_textarea():
async with LvContext():
await w.set_property(
CONF_TEXTAREA,
(await get_widgets(config, CONF_TEXTAREA))[0].obj,
)
async def add_textarea():
async with LvContext():
await w.set_property(
CONF_TEXTAREA, (await get_widgets(config, CONF_TEXTAREA))[0].obj
)
if is_widget_completed(textarea):
await add_textarea()
else:
CORE.add_job(add_textarea)
else:
# Handles updates in automations, and properly ordered initial config. Code is generated
# into the enclosing context (main or lambda)
await w.set_property(
CONF_TEXTAREA, (await get_widgets(config, CONF_TEXTAREA))[0].obj
)
keyboard_spec = KeyboardType()

View File

@@ -22,7 +22,7 @@ from ..defines import (
literal,
)
from ..lv_validation import animated, lv_int, size
from ..lvcode import LocalVariable, lv, lv_assign, lv_expr, lv_obj
from ..lvcode import LocalVariable, lv, lv_assign, lv_expr, lv_obj, lv_Pvariable
from ..schemas import container_schema, part_schema
from ..types import LV_EVENT, LvType, ObjUpdateAction, lv_obj_t, lv_obj_t_ptr
from . import Widget, WidgetType, add_widgets, get_widgets, set_obj_properties
@@ -83,8 +83,8 @@ class TabviewType(WidgetType):
await w.set_property("tab_bar_size", await size.process(config[CONF_SIZE]))
for tab_conf in config[CONF_TABS]:
w_id = tab_conf[CONF_ID]
tab_obj = cg.Pvariable(w_id, cg.nullptr, type_=lv_tab_t)
tab_widget = Widget.create(w_id, tab_obj, obj_spec)
tab_obj = lv_Pvariable(lv_tab_t, w_id)
tab_widget = Widget.create(w_id, tab_obj, obj_spec, tab_conf)
lv_assign(tab_obj, lv_expr.tabview_add_tab(w.obj, tab_conf[CONF_NAME]))
await set_obj_properties(tab_widget, tab_conf)
await add_widgets(tab_widget, tab_conf)

View File

@@ -37,7 +37,10 @@ void IRAM_ATTR MCP23016::gpio_intr(MCP23016 *arg) { arg->enable_loop_soon_any_co
void MCP23016::loop() {
// Invalidate cache at the start of each loop
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -2,6 +2,7 @@ from esphome import pins
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_ALLOW_OTHER_USES,
CONF_ID,
CONF_INPUT,
CONF_INTERRUPT,
@@ -30,10 +31,29 @@ MCP23XXX_INTERRUPT_MODES = {
"FALLING": MCP23XXXInterruptMode.MCP23XXX_FALLING,
}
def _validate_interrupt_pin(value):
# The MCP component owns INT polarity (active-low, hardcoded falling-edge ISR)
# and installs a single ISR per GPIO, so neither inversion nor sharing is supported.
value = pins.internal_gpio_input_pin_schema(value)
if value.get(CONF_INVERTED):
raise cv.Invalid(
f"'{CONF_INVERTED}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"the MCP23xxx INT line is fixed active-low"
)
if value.get(CONF_ALLOW_OTHER_USES):
raise cv.Invalid(
f"'{CONF_ALLOW_OTHER_USES}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"sharing the interrupt pin between multiple MCP23xxx (or other components) "
"is not implemented. Remove the interrupt_pin to fall back to polling."
)
return value
MCP23XXX_CONFIG_SCHEMA = cv.Schema(
{
cv.Optional(CONF_OPEN_DRAIN_INTERRUPT, default=False): cv.boolean,
cv.Optional(CONF_INTERRUPT_PIN): pins.internal_gpio_input_pin_schema,
cv.Optional(CONF_INTERRUPT_PIN): _validate_interrupt_pin,
}
).extend(cv.COMPONENT_SCHEMA)

View File

@@ -21,7 +21,10 @@ template<uint8_t N> class MCP23XXXBase : public Component, public gpio_expander:
void loop() override {
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -16,6 +16,13 @@ namespace esphome::nextion {
static const char *const TAG = "nextion.upload.arduino";
static constexpr size_t NEXTION_MAX_RESPONSE_LOG_BYTES = 16;
// Timeout for display acknowledgment during TFT upload (ms).
// A single value is used for all chunks; the happy path returns as soon as
// 0x05/0x08 arrives, so this only bounds failed-detection latency. Field
// reports showed the previous 500ms steady-state value was too tight for
// some firmware variants.
static constexpr uint32_t NEXTION_UPLOAD_ACK_TIMEOUT_MS = 5000;
// Followed guide
// https://unofficialnextion.com/t/nextion-upload-protocol-v1-2-the-fast-one/1044/2
@@ -80,14 +87,14 @@ int Nextion::upload_by_chunks_(HTTPClient &http_client, uint32_t &range_start) {
recv_string.clear();
this->write_array(buffer, buffer_size);
App.feed_wdt();
this->recv_ret_string_(recv_string, this->upload_first_chunk_sent_ ? 500 : 5000, true);
this->recv_ret_string_(recv_string, NEXTION_UPLOAD_ACK_TIMEOUT_MS, true);
this->content_length_ -= read_len;
const float upload_percentage = 100.0f * (this->tft_size_ - this->content_length_) / this->tft_size_;
ESP_LOGD(TAG, "Upload: %0.2f%% (%" PRIu32 " left, heap: %" PRIu32 ")", upload_percentage, this->content_length_,
EspClass::getFreeHeap());
this->upload_first_chunk_sent_ = true;
if (recv_string.empty()) {
ESP_LOGW(TAG, "No response from display during upload");
ESP_LOGW(TAG, "No response from display after %" PRIu32 "ms", NEXTION_UPLOAD_ACK_TIMEOUT_MS);
allocator.deallocate(buffer, 4096);
buffer = nullptr;
return -1;

View File

@@ -19,6 +19,13 @@ namespace esphome::nextion {
static const char *const TAG = "nextion.upload.esp32";
static constexpr size_t NEXTION_MAX_RESPONSE_LOG_BYTES = 16;
// Timeout for display acknowledgment during TFT upload (ms).
// A single value is used for all chunks; the happy path returns as soon as
// 0x05/0x08 arrives, so this only bounds failed-detection latency. Field
// reports showed the previous 500ms steady-state value was too tight for
// some firmware variants.
static constexpr uint32_t NEXTION_UPLOAD_ACK_TIMEOUT_MS = 5000;
// Followed guide
// https://unofficialnextion.com/t/nextion-upload-protocol-v1-2-the-fast-one/1044/2
@@ -96,7 +103,7 @@ int Nextion::upload_by_chunks_(esp_http_client_handle_t http_client, uint32_t &r
recv_string.clear();
this->write_array(buffer, buffer_size);
App.feed_wdt();
this->recv_ret_string_(recv_string, upload_first_chunk_sent_ ? 500 : 5000, true);
this->recv_ret_string_(recv_string, NEXTION_UPLOAD_ACK_TIMEOUT_MS, true);
this->content_length_ -= read_len;
const float upload_percentage = 100.0f * (this->tft_size_ - this->content_length_) / this->tft_size_;
#ifdef USE_PSRAM
@@ -109,7 +116,7 @@ int Nextion::upload_by_chunks_(esp_http_client_handle_t http_client, uint32_t &r
#endif
upload_first_chunk_sent_ = true;
if (recv_string.empty()) {
ESP_LOGW(TAG, "No response from display during upload");
ESP_LOGW(TAG, "No response from display after %" PRIu32 "ms", NEXTION_UPLOAD_ACK_TIMEOUT_MS);
allocator.deallocate(buffer, 4096);
buffer = nullptr;
return -1;

View File

@@ -62,7 +62,10 @@ void IRAM_ATTR PCA6416AComponent::gpio_intr(PCA6416AComponent *arg) { arg->enabl
void PCA6416AComponent::loop() {
// Invalidate cache at the start of each loop
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -50,8 +50,10 @@ void IRAM_ATTR PCA9554Component::gpio_intr(PCA9554Component *arg) { arg->enable_
void PCA9554Component::loop() {
// Invalidate the cache so the next digital_read() triggers a fresh I2C read
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Interrupt-driven: disable loop until next interrupt fires
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -31,8 +31,10 @@ void IRAM_ATTR PCF8574Component::gpio_intr(PCF8574Component *arg) { arg->enable_
void PCF8574Component::loop() {
// Invalidate the cache so the next digital_read() triggers a fresh I2C read
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Interrupt-driven: disable loop until next interrupt fires
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -82,7 +82,10 @@ void PI4IOE5V6408Component::pin_mode(uint8_t pin, gpio::Flags flags) {
void PI4IOE5V6408Component::loop() {
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -129,6 +129,6 @@ async def to_code(config):
async def sensor_template_publish_to_code(config, action_id, template_arg, args):
paren = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, paren)
template_ = await cg.templatable(config[CONF_VALUE], args, cg.int32)
template_ = await cg.templatable(config[CONF_VALUE], args, cg.int_)
cg.add(var.set_value(template_))
return var

View File

@@ -57,7 +57,10 @@ void TCA9555Component::pin_mode(uint8_t pin, gpio::Flags flags) {
}
void TCA9555Component::loop() {
this->reset_pin_cache_();
if (this->interrupt_pin_ != nullptr) {
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
// stale state forever; keep looping until the line is released so we self-heal.
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
this->disable_loop();
}
}

View File

@@ -1,3 +1,4 @@
import errno
from importlib import resources
import logging
@@ -74,6 +75,12 @@ def _load_tzdata(iana_key: str) -> bytes | None:
return (resources.files(package) / resource).read_bytes()
except (FileNotFoundError, ModuleNotFoundError, IsADirectoryError):
return None
except OSError as e:
# Windows raises EINVAL for paths with NTFS-illegal chars (e.g. '<'/'>'
# in POSIX TZ strings like "<+08>-8" that validate_tz feeds back here).
if e.errno == errno.EINVAL:
return None
raise
def _extract_tz_string(tzfile: bytes) -> str:

View File

@@ -116,12 +116,23 @@ CONFIG_SCHEMA = cv.ensure_list(
async def to_code(config):
# The output chunk pool/queue are compile-time-sized templates shared by all
# USBUartChannel instances, so use the largest buffer_size across every channel
# of every device. Each chunk is 64 bytes (USB FS MPS); add one extra slot
# because LockFreeQueue<T,N> is a ring buffer that wastes one entry.
max_buffer_size = max(
channel[CONF_BUFFER_SIZE]
for device in config
for channel in device[CONF_CHANNELS]
)
output_chunk_count = max_buffer_size // 64 + 1
cg.add_define("USB_UART_OUTPUT_CHUNK_COUNT", output_chunk_count)
for device in config:
var = await register_usb_client(device)
for index, channel in enumerate(device[CONF_CHANNELS]):
chvar = cg.new_Pvariable(channel[CONF_ID], index, channel[CONF_BUFFER_SIZE])
await cg.register_parented(chvar, var)
cg.add(chvar.set_rx_buffer_size(channel[CONF_BUFFER_SIZE]))
cg.add(chvar.set_stop_bits(channel[CONF_STOP_BITS]))
cg.add(chvar.set_data_bits(channel[CONF_DATA_BITS]))
cg.add(chvar.set_parity(channel[CONF_PARITY]))

View File

@@ -132,8 +132,9 @@ class USBUartChannel : public uart::UARTComponent, public Parented<USBUartCompon
friend class USBUartTypeCH34X;
public:
// Number of output chunk slots per channel (8 × 64 bytes = 512 bytes peak, lazily allocated)
static constexpr uint8_t USB_OUTPUT_CHUNK_COUNT = 8;
// Number of output chunk slots per channel, derived from buffer_size config.
// Computed as ceil(buffer_size / 64) + 1 in Python codegen; defaults to 5 (256 / 64 + 1).
static constexpr uint8_t USB_OUTPUT_CHUNK_COUNT = USB_UART_OUTPUT_CHUNK_COUNT;
USBUartChannel(uint8_t index, uint16_t buffer_size) : index_(index), input_buffer_(RingBuffer(buffer_size)) {}
void write_array(const uint8_t *data, size_t len) override;

View File

@@ -1570,6 +1570,8 @@ void WiFiComponent::check_connecting_finished(uint32_t now) {
#endif
this->state_ = WIFI_COMPONENT_STATE_STA_CONNECTED;
// Refresh is_connected() cache; loop()'s refresh ran before this transition.
this->update_connected_state_();
this->num_retried_ = 0;
this->print_connect_params_();

View File

@@ -948,6 +948,8 @@ void WiFiComponent::process_pending_callbacks_() {
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
if (this->pending_.disconnect) {
this->pending_.disconnect = false;
// Refresh is_connected() cache here, not in the SDK callback (sys context).
this->update_connected_state_();
this->notify_disconnect_state_listeners_();
}
#endif

View File

@@ -179,7 +179,10 @@ void WiFiComponent::wifi_pre_setup_() {
#endif // USE_WIFI_AP
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
// cfg.nvs_enable = false;
if (global_preferences->nvs_handle == 0) {
ESP_LOGW(TAG, "starting wifi without nvs");
cfg.nvs_enable = false;
}
err = esp_wifi_init(&cfg);
if (err != ERR_OK) {
ESP_LOGE(TAG, "esp_wifi_init failed: %s", esp_err_to_name(err));
@@ -796,6 +799,8 @@ void WiFiComponent::wifi_process_event_(IDFWiFiEvent *data) {
s_sta_connected = false;
s_sta_connecting = false;
error_from_callback_ = true;
// Refresh is_connected() cache; error_from_callback_ makes it false.
this->update_connected_state_();
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
this->notify_disconnect_state_listeners_();
#endif

View File

@@ -536,6 +536,8 @@ void WiFiComponent::wifi_process_event_(LTWiFiEvent *event) {
this->error_from_callback_ = true;
}
// Refresh is_connected() cache; sta_state_/error_from_callback_ make it false.
this->update_connected_state_();
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
this->notify_disconnect_state_listeners_();
#endif

View File

@@ -342,6 +342,8 @@ void WiFiComponent::wifi_loop_() {
s_sta_was_connected = false;
s_sta_had_ip = false;
ESP_LOGV(TAG, "Disconnected");
// Refresh is_connected() cache; driver link status reports disconnected.
this->update_connected_state_();
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
this->notify_disconnect_state_listeners_();
#endif

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2026.4.1"
__version__ = "2026.4.4"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -284,6 +284,7 @@
#define ESPHOME_WIFI_POWER_SAVE_LISTENERS 2
#define USE_WIFI_RUNTIME_POWER_SAVE
#define USB_HOST_MAX_REQUESTS 16
#define USB_UART_OUTPUT_CHUNK_COUNT 5
#ifdef USE_ARDUINO
#define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 3, 7)

View File

@@ -606,33 +606,43 @@ def Pvariable(id_: ID, rhs: SafeExpType, type_: "MockObj" = None) -> "MockObj":
if isinstance(rhs, MockObj) and rhs.is_new_expr:
# For 'new' allocations, use placement new into static storage
# to avoid heap fragmentation on embedded devices.
the_type = id_.type
#
# Storage must be sized and aligned for the actual instantiated class,
# which may be a subclass of id_.type (e.g. `cv.declare_id(BaseClass)`
# combined with `SubClass.new()` — used by ili9xxx, waveshare_epaper,
# etc. to select a model-specific constructor). Using id_.type would
# run the base-class default constructor instead, silently losing any
# subclass initialization. Template args live on the CallExpression
# and are re-emitted below.
call_expr = rhs.base
assert isinstance(call_expr, CallExpression), (
f"Expected CallExpression for placement new, got {type(call_expr)}"
)
actual_type = rhs.new_type if rhs.new_type is not None else id_.type
if call_expr.template_args is not None:
actual_type = f"{actual_type}{call_expr.template_args}"
pointer_type = id_.type
# Extract component namespace from type for memory analysis attribution
component_ns = _extract_component_ns(str(the_type))
component_ns = _extract_component_ns(str(actual_type))
storage_name = f"{component_ns}__{id_.id}__pstorage"
# Declare aligned byte array for the object storage
CORE.add_global(
RawStatement(
f"alignas({the_type}) static unsigned char {storage_name}[sizeof({the_type})];"
f"alignas({actual_type}) static unsigned char {storage_name}[sizeof({actual_type})];"
)
)
# Pointer declaration uses id_.type to preserve the declared base-class
# pointer type for downstream callers (polymorphism through base ptr).
CORE.add_global(
AssignmentExpression(
f"static {the_type}",
f"static {pointer_type}",
"*const ",
id_,
MockObj(f"reinterpret_cast<{the_type} *>({storage_name})"),
MockObj(f"reinterpret_cast<{pointer_type} *>({storage_name})"),
)
)
# Extract args from the CallExpression and rebuild as placement new.
# Template args are already encoded in the_type (e.g. GlobalsComponent<int>),
# so we only pass the constructor args, not template_args.
call_expr = rhs.base
assert isinstance(call_expr, CallExpression), (
f"Expected CallExpression for placement new, got {type(call_expr)}"
)
placement_new = CallExpression(f"new({id_.id}) {the_type}", *call_expr.args)
placement_new = CallExpression(f"new({id_.id}) {actual_type}", *call_expr.args)
CORE.add(ExpressionStatement(placement_new))
else:
decl = VariableDeclarationExpression(id_.type, "*", id_, static=True)
@@ -869,12 +879,16 @@ class MockObj(Expression):
Mostly consists of magic methods that allow ESPHome's codegen syntax.
"""
__slots__ = ("base", "op", "is_new_expr")
__slots__ = ("base", "op", "is_new_expr", "new_type")
def __init__(self, base, op=".", is_new_expr=False) -> None:
def __init__(self, base, op=".", is_new_expr=False, new_type=None) -> None:
self.base = base
self.op = op
self.is_new_expr = is_new_expr
# For `is_new_expr=True` objects, `new_type` holds the class name being
# constructed (e.g. "ili9xxx::ILI9XXXST7789V"). Needed by Pvariable so
# placement new uses the actual subclass rather than id_.type.
self.new_type = new_type
def __getattr__(self, attr: str) -> "MockObj":
# prevent python dunder methods being replaced by mock objects
@@ -889,7 +903,9 @@ class MockObj(Expression):
def __call__(self, *args: SafeExpType) -> "MockObj":
call = CallExpression(self.base, *args)
return MockObj(call, self.op, is_new_expr=self.is_new_expr)
return MockObj(
call, self.op, is_new_expr=self.is_new_expr, new_type=self.new_type
)
def __str__(self):
return str(self.base)
@@ -903,7 +919,7 @@ class MockObj(Expression):
@property
def new(self) -> "MockObj":
return MockObj(f"new {self.base}", "->", is_new_expr=True)
return MockObj(f"new {self.base}", "->", is_new_expr=True, new_type=self.base)
def template(self, *args: SafeExpType) -> "MockObj":
"""Apply template parameters to this object."""

View File

@@ -53,6 +53,37 @@ FILTER_PLATFORMIO_LINES = [
]
def _strip_win_long_path_prefix(path: str) -> str:
r"""Strip the Windows extended-length path prefix from ``path``.
Handles both forms documented at
https://learn.microsoft.com/windows/win32/fileio/naming-a-file:
* ``\\?\C:\path\to\file`` -> ``C:\path\to\file``
* ``\\?\UNC\server\share\path`` -> ``\\server\share\path``
The NSIS-installed ``esphome.exe`` launcher on Windows starts Python with
``sys.executable`` already prefixed with ``\\?\``. That prefix propagates
into PlatformIO's ``$PYTHONEXE`` (PlatformIO reads ``PYTHONEXEPATH`` from
the environment, falling back to ``os.path.normpath(sys.executable)``)
and ends up baked into SCons-emitted command lines for build steps such
as the esp8266 ``elf2bin`` invocation. ``cmd.exe`` does not understand
the ``\\?\`` prefix, so the build fails with
"The system cannot find the path specified." Stripping the prefix early
keeps the path shell-quotable.
No-op on non-Windows platforms.
"""
if sys.platform != "win32":
return path
if path.startswith("\\\\?\\UNC\\"):
# \\?\UNC\server\share\... -> \\server\share\...
return "\\\\" + path[len("\\\\?\\UNC\\") :]
if path.startswith("\\\\?\\"):
return path[len("\\\\?\\") :]
return path
def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ["PLATFORMIO_FORCE_COLOR"] = "true"
os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute())
@@ -63,7 +94,18 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning")
# Increase uv retry count to handle transient network errors (default is 3)
os.environ.setdefault("UV_HTTP_RETRIES", "10")
cmd = [sys.executable, "-m", "esphome.platformio_runner"] + list(args)
# Strip the Windows extended-length path prefix from sys.executable so it
# doesn't propagate into PlatformIO's $PYTHONEXE and break SCons-emitted
# command lines run through cmd.exe.
python_exe = _strip_win_long_path_prefix(sys.executable)
if python_exe != sys.executable:
# Only override PYTHONEXEPATH when we actually stripped a prefix.
# PlatformIO's get_pythonexe_path() reads this and falls back to
# sys.executable otherwise; setting it unconditionally would clobber
# a user-provided value (or the unmodified path on platforms that
# don't need the strip).
os.environ["PYTHONEXEPATH"] = python_exe
cmd = [python_exe, "-m", "esphome.platformio_runner"] + list(args)
return run_external_process(*cmd, **kwargs)

View File

@@ -94,13 +94,29 @@ def safe_print(message="", end="\n"):
except UnicodeEncodeError:
pass
# Fall back to the stream's actual encoding (e.g. cp1252 on Windows
# redirected pipes). Use "backslashreplace" so unencodable code points
# like the wifi signal-bar block characters (U+2582..U+2588) become
# readable ``\uXXXX`` escapes, and decode back to ``str`` so ``print``
# never receives a ``bytes`` object (which would render as a ``b'...'``
# repr).
encoding = sys.stdout.encoding or "ascii"
try:
print(message.encode("utf-8", "backslashreplace"), end=end)
print(
message.encode(encoding, "backslashreplace").decode(encoding),
end=end,
)
return
except UnicodeEncodeError:
try:
print(message.encode("ascii", "backslashreplace"), end=end)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
pass
try:
print(
message.encode("ascii", "backslashreplace").decode("ascii"),
end=end,
)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
def safe_input(prompt=""):

View File

@@ -0,0 +1,20 @@
esphome:
name: test
esp32:
board: esp32dev
framework:
type: arduino
spi:
clk_pin: GPIO18
mosi_pin: GPIO23
display:
- platform: ili9xxx
id: tft_display
model: ST7789V
cs_pin: GPIO5
dc_pin: GPIO17
reset_pin: GPIO16
invert_colors: false

View File

@@ -0,0 +1,31 @@
"""Tests for the ili9xxx component."""
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
def test_ili9xxx_placement_new_uses_model_subclass(
generate_main: Callable[[str | Path], str],
component_config_path: Callable[[str], Path],
) -> None:
"""Regression test for ili9xxx picking the right constructor under placement new.
ili9xxx declares the ID as the base ``ILI9XXXDisplay`` but constructs a
model-specific subclass (e.g. ``ILI9XXXST7789V``) via ``MODELS[...].new()``.
Pvariable must emit placement new for the subclass — otherwise the base
default constructor runs and the panel is left with a null init sequence
and 0x0 dimensions, producing a silent blank screen.
"""
main_cpp = generate_main(component_config_path("ili9xxx_test.yaml"))
# Storage is sized for the subclass so the full object fits.
assert "sizeof(ili9xxx::ILI9XXXST7789V)" in main_cpp
assert "alignas(ili9xxx::ILI9XXXST7789V)" in main_cpp
# Pointer is declared as the base type for polymorphism.
assert "static ili9xxx::ILI9XXXDisplay *const tft_display" in main_cpp
# Placement new runs the subclass constructor — this is the actual regression fix.
assert "new(tft_display) ili9xxx::ILI9XXXST7789V()" in main_cpp
# Base-class default constructor must NOT be used.
assert "new(tft_display) ili9xxx::ILI9XXXDisplay()" not in main_cpp

View File

@@ -7,10 +7,12 @@ from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
from PIL import Image as PILImage
import pytest
from esphome import config_validation as cv
from esphome.components.image import (
CONF_ALPHA_CHANNEL,
CONF_INVERT_ALPHA,
CONF_OPAQUE,
CONF_TRANSPARENCY,
@@ -411,3 +413,70 @@ async def test_svg_with_mm_dimensions_succeeds(
assert 30 < height < 50, (
f"Height should be around 39 pixels for 10mm at 100dpi, got {height}"
)
@pytest.mark.asyncio
async def test_rgb565_alpha_animation_layout_per_frame(
tmp_path: Path,
mock_progmem_array: MagicMock,
) -> None:
"""RGB565+alpha animations must store each frame as a self-contained
[RGB plane | alpha plane] block. Animation::update_data_start_ steps frames
with a single per-frame stride, so any cross-frame layout (all RGB then all
alpha) makes the C++ alpha read land in the next frame's RGB bytes — that
was the regression behind issue #15999.
"""
# Build a 2-frame APNG where each frame is a solid color with a known
# alpha. APNG preserves full RGBA per pixel (GIF only has 1-bit alpha so
# round-tripping mid-range alpha values does not work). Frame 0 is fully
# opaque red, frame 1 is fully transparent blue.
width = 4
height = 3
frame0 = PILImage.new("RGBA", (width, height), (255, 0, 0, 0xFF))
frame1 = PILImage.new("RGBA", (width, height), (0, 0, 255, 0x00))
apng_path = tmp_path / "anim.png"
frame0.save(
apng_path,
format="PNG",
save_all=True,
append_images=[frame1],
duration=100,
loop=0,
)
config = {
CONF_FILE: str(apng_path),
CONF_TYPE: "RGB565",
CONF_TRANSPARENCY: CONF_ALPHA_CHANNEL,
CONF_DITHER: "NONE",
CONF_INVERT_ALPHA: False,
CONF_RAW_DATA_ID: "test_raw_data_id",
}
_, _, _, _, _, frame_count = await write_image(config, all_frames=True)
assert frame_count == 2
# Recover the bytes handed to progmem_array. Signature is (id_, rhs).
_, raw_data = mock_progmem_array.call_args.args
data = [int(x) for x in raw_data]
rgb_size = width * height * 2
alpha_size = width * height
frame_size = rgb_size + alpha_size
assert len(data) == frame_size * frame_count, (
"RGB565+alpha animation buffer must be (RGB + alpha) per frame, not "
"all RGB followed by all alpha"
)
# Frame 0: RGB plane is red, alpha plane is 0xFF. Frame 1: alpha plane is
# 0x00. If the layout regresses to [all RGB | all alpha], the alpha bytes
# would all land at the tail of the buffer and the per-frame slices below
# would point at RGB565 noise instead.
frame0_alpha = data[rgb_size : rgb_size + alpha_size]
frame1_alpha = data[frame_size + rgb_size : frame_size + rgb_size + alpha_size]
assert all(a == 0xFF for a in frame0_alpha), (
f"Frame 0 alpha plane should be opaque, got {frame0_alpha}"
)
assert all(a == 0x00 for a in frame1_alpha), (
f"Frame 1 alpha plane should be transparent, got {frame1_alpha}"
)

View File

@@ -91,6 +91,24 @@ api:
- float_arr.size()
- string_arr[0].c_str()
- string_arr.size()
# Test array + string args used after a non-synchronous action (delay).
# The default non-owning types (StringRef, const FixedVector&) would
# dangle once rx_buf_ is reused, and FixedVector is non-copyable so
# DelayAction's lambda capture would fail to compile. The api codegen
# must fall back to owning std::string / std::vector here.
- action: array_with_delay
variables:
name: string
int_arr: int[]
string_arr: string[]
then:
- delay: 20ms
- logger.log:
format: "Delayed: %s (%u ints, %u strings)"
args:
- name.c_str()
- int_arr.size()
- string_arr.size()
# Test ContinuationAction (IfAction with then/else branches)
- action: test_if_action
variables:

View File

@@ -4,3 +4,9 @@ esphome:
- deep_sleep.prevent
- delay: 1s
- deep_sleep.allow
- if:
condition:
lambda: 'return false;'
then:
- deep_sleep.enter:
sleep_duration: 60min

View File

@@ -0,0 +1,7 @@
*** DO NOT USE THIS KEY...EVER ***
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIEZIp96p7Z7QN6vxOFE5FdRNm535vW81Ax07KnGxVjiMoAoGCCqGSM49
AwEHoUQDQgAEK+fBQDn1Q+r5lGwcDoMUgeg2Aq16LLrLUz7xWI6mS0PUClzolDIo
eaV/Pfjl7zAvkbQQsZq3rTNnr1eGAk5P+A==
-----END EC PRIVATE KEY-----
*** DO NOT USE THIS KEY...EVER ***

View File

@@ -0,0 +1,10 @@
esp32:
variant: esp32
framework:
type: esp-idf
advanced:
signed_ota_verification:
signing_key: ../../components/esp32/dummy_signing_key_v1_ecdsa.pem
signing_scheme: ecdsa_v1
<<: !include common.yaml

View File

@@ -1,6 +1,11 @@
"""Tests for time component cron expression parsing."""
from esphome.components.time import _parse_cron_part
import errno
from unittest.mock import MagicMock, patch
import pytest
from esphome.components.time import _load_tzdata, _parse_cron_part, validate_tz
def test_star_slash_seconds() -> None:
@@ -78,3 +83,63 @@ def test_range() -> None:
def test_single_value() -> None:
assert _parse_cron_part("30", 0, 59, {}) == {30}
def _mock_resources_with_error(error: Exception) -> MagicMock:
"""Return a mock of importlib.resources.files where read_bytes raises error."""
leaf = MagicMock()
leaf.read_bytes.side_effect = error
package = MagicMock()
package.__truediv__.return_value = leaf
return MagicMock(return_value=package)
def test_load_tzdata_returns_none_on_windows_einval() -> None:
"""On Windows, opening a tzdata path with NTFS-illegal chars raises OSError(EINVAL).
Regression test for crash when the system TZ resolves to a POSIX string like
"<+08>-8" (Asia/Shanghai, IST, etc.) and is fed back into _load_tzdata by
validate_tz to check whether it is also a valid IANA key.
"""
err = OSError(errno.EINVAL, "Invalid argument")
with patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(err),
):
assert _load_tzdata("<+08>-8") is None
def test_load_tzdata_propagates_unexpected_oserror() -> None:
"""Unrelated OSErrors (e.g. PermissionError) must not be swallowed."""
with (
patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(
PermissionError(errno.EACCES, "Permission denied")
),
),
pytest.raises(PermissionError),
):
_load_tzdata("Some/Zone")
def test_load_tzdata_returns_none_on_file_not_found() -> None:
"""Existing behavior: missing tz file returns None rather than raising."""
with patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(FileNotFoundError()),
):
assert _load_tzdata("Not/A/Zone") is None
def test_validate_tz_accepts_posix_string_when_read_bytes_raises_einval() -> None:
"""validate_tz must not crash when _load_tzdata hits the Windows EINVAL path.
Simulates the Windows case where the auto-detected POSIX TZ string is fed
back through _load_tzdata and the underlying read_bytes raises errno 22.
"""
with patch(
"esphome.components.time.resources.files",
_mock_resources_with_error(OSError(errno.EINVAL, "Invalid argument")),
):
assert validate_tz("<+08>-8") == "<+08>-8"

View File

@@ -311,6 +311,105 @@ def test_run_platformio_cli_sets_environment_variables(
assert "arg" in args
@pytest.mark.parametrize(
("platform", "input_path", "expected"),
[
# win32: drive-letter extended-length prefix is stripped
(
"win32",
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# win32: UNC extended-length prefix is translated to a regular UNC path
(
"win32",
"\\\\?\\UNC\\server\\share\\python.exe",
"\\\\server\\share\\python.exe",
),
# win32: paths without the prefix are returned unchanged
(
"win32",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# non-win32: prefix is left alone (no-op)
("linux", "\\\\?\\C:\\python.exe", "\\\\?\\C:\\python.exe"),
("darwin", "/usr/bin/python3", "/usr/bin/python3"),
],
)
def test_strip_win_long_path_prefix(
platform: str, input_path: str, expected: str
) -> None:
r"""``\\?\`` and ``\\?\UNC\`` prefixes are stripped only on win32."""
with patch("esphome.platformio_api.sys.platform", platform):
assert platformio_api._strip_win_long_path_prefix(input_path) == expected
def test_run_platformio_cli_strips_win_long_path_prefix(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""Windows ``\\?\`` prefix on sys.executable does not leak into the subprocess.
The NSIS-installed esphome.exe launcher starts Python with
``sys.executable`` already prefixed by the extended-length path marker.
That prefix would otherwise propagate into PlatformIO's ``PYTHONEXE`` and
break SCons-emitted command lines run through ``cmd.exe``.
"""
CORE.build_path = str(setup_core / "build" / "test")
prefixed_exe = (
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
stripped_exe = (
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "win32"),
patch("esphome.platformio_api.sys.executable", prefixed_exe),
):
# Pop any pre-existing PYTHONEXEPATH so the assertion below reflects
# what run_platformio_cli set, not whatever the test runner's
# environment happened to contain.
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# The subprocess is invoked with the stripped executable path.
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == stripped_exe
# PYTHONEXEPATH is exported with the stripped path so PlatformIO's
# get_pythonexe_path() picks it up in the subprocess.
assert os.environ["PYTHONEXEPATH"] == stripped_exe
def test_run_platformio_cli_does_not_set_pythonexepath_without_strip(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""PYTHONEXEPATH is not touched when sys.executable has no ``\\?\`` prefix.
Setting it unconditionally would clobber a user-provided value (or
interfere with non-Windows tooling that has no prefix to strip).
"""
CORE.build_path = str(setup_core / "build" / "test")
plain_exe = "/usr/bin/python3"
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "linux"),
patch("esphome.platformio_api.sys.executable", plain_exe),
):
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == plain_exe
assert "PYTHONEXEPATH" not in os.environ
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:

View File

@@ -709,3 +709,119 @@ def test_detect_rp2040_bootsel_timeout() -> None:
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False
class TestSafePrint:
"""Tests for ``safe_print`` and its UnicodeEncodeError fallback chain."""
@pytest.fixture(autouse=True)
def _no_dashboard(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Default ``CORE.dashboard`` to False so each test starts hermetic."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", False)
def test_prints_plain_message(self, capsys: pytest.CaptureFixture[str]) -> None:
"""ASCII-only messages take the fast path through native ``print``."""
util.safe_print("hello world")
assert capsys.readouterr().out == "hello world\n"
def test_prints_unicode_on_utf8_stdout(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""Non-ASCII goes straight through when stdout can encode it."""
util.safe_print("bars: \u2582\u2584\u2586\u2588")
assert capsys.readouterr().out == "bars: \u2582\u2584\u2586\u2588\n"
def test_dashboard_escapes_esc_byte(
self,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
r"""Dashboard mode escapes raw ``\033`` ESC bytes to literal ``\\033``."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
util.safe_print("\033[0;32mhi\033[0m")
assert capsys.readouterr().out == "\\033[0;32mhi\\033[0m\n"
def test_fallback_writes_string_not_bytes_repr(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Regression: cp1252 fallback must produce a printable str, not ``b'...'``.
On Windows, when stdout is a redirected pipe (e.g. the dashboard),
Python uses cp1252, which cannot encode the wifi signal-bar block
characters (U+2582..U+2588). The previous fallback path called
``print(message.encode(...))`` with a ``bytes`` object, which
Python's ``print`` rendered as a literal ``b'...'`` repr — visible
in the user's dashboard output. The fix re-encodes through the
stream's encoding with ``backslashreplace`` and decodes back to
``str``.
"""
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("bars: \u2582\u2584\u2586\u2588 done")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Output is a clean line, not the bytes repr.
assert not output.startswith("b'")
assert "b'bars" not in output
# Unencodable codepoints become readable backslash escapes.
assert "\\u2582\\u2584\\u2586\\u2588" in output
# Encodable parts survive unchanged.
assert "bars: " in output
assert " done" in output
assert output.endswith("\n")
def test_fallback_with_dashboard_escaped_message(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Dashboard ESC escaping + cp1252 fallback compose correctly."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("\033[0;32m\u2582\u2584\u2586\u2588\033[0m")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Dashboard escaping turned ESC into literal "\033" (5 chars), which
# cp1252 can encode, so it survives the round-trip verbatim.
assert "\\033[0;32m" in output
assert "\\033[0m" in output
# Block characters became backslash escapes via backslashreplace.
assert "\\u2582\\u2584\\u2586\\u2588" in output
def test_final_message_when_locale_is_invalid(
self,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
"""If every encoding path fails, surface the locale-error sentinel."""
original_print = print
call_count = 0
def fake_print(*args: Any, **kwargs: Any) -> None:
nonlocal call_count
call_count += 1
# The first three calls are: native print, stream-encoding
# fallback, ASCII fallback. Make all three raise so we reach
# the final sentinel "Cannot print line..." which is expected
# to succeed (no encoding required).
if call_count <= 3:
raise UnicodeEncodeError("ascii", "x", 0, 1, "boom")
original_print(*args, **kwargs)
monkeypatch.setattr("builtins.print", fake_print)
util.safe_print("x")
assert call_count == 4
assert (
capsys.readouterr().out == "Cannot print line because of invalid locale!\n"
)