mirror of
https://github.com/esphome/esphome.git
synced 2026-06-26 02:23:59 +00:00
Compare commits
32 Commits
config-ver
...
2026.4.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fda5f41b2 | ||
|
|
197d4dac8e | ||
|
|
2d7f9dc48d | ||
|
|
be84e6c9f4 | ||
|
|
0418f2138a | ||
|
|
d9c22d6b56 | ||
|
|
60a94fd109 | ||
|
|
9371ec319a | ||
|
|
ce466c6b60 | ||
|
|
a460f5343c | ||
|
|
4e0509435a | ||
|
|
95b5ab7e78 | ||
|
|
3ac0939f55 | ||
|
|
191d3bc7e4 | ||
|
|
a186f6fea9 | ||
|
|
aea88aef5e | ||
|
|
433bbdb016 | ||
|
|
4137d93cbf | ||
|
|
6a5919ee87 | ||
|
|
b753ee4e94 | ||
|
|
c26ea52620 | ||
|
|
6ca5b31fab | ||
|
|
00b71208a6 | ||
|
|
76eb8f697f | ||
|
|
2a3bd8bc85 | ||
|
|
629da4d878 | ||
|
|
5c2ceb63e0 | ||
|
|
92cb6dd7fd | ||
|
|
06e5931ad7 | ||
|
|
dc5b06285d | ||
|
|
3d0a2421a6 | ||
|
|
22f6791dea |
2
Doxyfile
2
Doxyfile
@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
|
||||
# could be handy for archiving the generated documentation or if some version
|
||||
# control system is used.
|
||||
|
||||
PROJECT_NUMBER = 2026.4.1
|
||||
PROJECT_NUMBER = 2026.4.4
|
||||
|
||||
# Using the PROJECT_BRIEF tag one can provide an optional one line description
|
||||
# for a project that appears at the top of each page and should give viewer a
|
||||
|
||||
@@ -598,7 +598,7 @@ async def component_resume_action_to_code(
|
||||
comp = await cg.get_variable(config[CONF_ID])
|
||||
var = cg.new_Pvariable(action_id, template_arg, comp)
|
||||
if CONF_UPDATE_INTERVAL in config:
|
||||
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int)
|
||||
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, cg.uint32)
|
||||
cg.add(var.set_update_interval(template_))
|
||||
return var
|
||||
|
||||
|
||||
@@ -62,7 +62,12 @@ void Animation::set_frame(int frame) {
|
||||
}
|
||||
|
||||
void Animation::update_data_start_() {
|
||||
const uint32_t image_size = this->get_width_stride() * this->height_;
|
||||
uint32_t image_size = this->get_width_stride() * this->height_;
|
||||
// RGB565 with an alpha channel stores the alpha plane immediately after the RGB
|
||||
// plane within each frame, so the per-frame stride includes the alpha bytes.
|
||||
if (this->type_ == image::IMAGE_TYPE_RGB565 && this->transparency_ == image::TRANSPARENCY_ALPHA_CHANNEL) {
|
||||
image_size += static_cast<uint32_t>(this->width_) * this->height_;
|
||||
}
|
||||
this->data_start_ = this->animation_data_start_ + image_size * this->current_frame_;
|
||||
}
|
||||
|
||||
|
||||
@@ -72,17 +72,35 @@ APIUnregisterServiceCallAction = api_ns.class_(
|
||||
|
||||
UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger)
|
||||
ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument")
|
||||
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
|
||||
# Owning element type for each YAML service variable type. Used to derive both
|
||||
# the zero-copy native types and the owning fallback types below.
|
||||
_SERVICE_ARG_SCALAR_TYPES: dict[str, MockObj] = {
|
||||
"bool": cg.bool_,
|
||||
"int": cg.int32,
|
||||
"float": cg.float_,
|
||||
"string": cg.std_string,
|
||||
}
|
||||
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
|
||||
# Scalars are passed by value; string uses a non-owning view into rx_buf_.
|
||||
**_SERVICE_ARG_SCALAR_TYPES,
|
||||
"string": cg.StringRef,
|
||||
"bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"),
|
||||
"int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"),
|
||||
"float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"),
|
||||
"string[]": cg.FixedVector.template(cg.std_string)
|
||||
.operator("const")
|
||||
.operator("ref"),
|
||||
# Arrays are passed as non-owning const references into rx_buf_.
|
||||
**{
|
||||
f"{name}[]": cg.FixedVector.template(t).operator("const").operator("ref")
|
||||
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
|
||||
},
|
||||
}
|
||||
# Owning fallback types used when the action chain contains non-synchronous actions
|
||||
# (delay, wait_until, script.wait, etc.). The default non-owning types reference
|
||||
# storage in the receive buffer, which is reused once the synchronous portion of
|
||||
# the chain returns. FixedVector is also non-copyable, so the deferred lambda
|
||||
# capture in DelayAction::play_complex would fail to compile.
|
||||
SERVICE_ARG_FALLBACK_TYPES: dict[str, MockObj] = {
|
||||
"string": cg.std_string,
|
||||
**{
|
||||
f"{name}[]": cg.std_vector.template(t)
|
||||
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
|
||||
},
|
||||
}
|
||||
CONF_ENCRYPTION = "encryption"
|
||||
CONF_BATCH_DELAY = "batch_delay"
|
||||
@@ -382,17 +400,20 @@ async def to_code(config: ConfigType) -> None:
|
||||
func_args.append((cg.bool_, "return_response"))
|
||||
|
||||
# Check if action chain has non-synchronous actions that would make
|
||||
# non-owning StringRef dangle (rx_buf_ reused after delay)
|
||||
# non-owning args (StringRef, const FixedVector&) dangle once the
|
||||
# rx_buf_ is reused after a delay/wait_until/script.wait/etc. The
|
||||
# FixedVector references would also fail to compile because they
|
||||
# are non-copyable and DelayAction captures args by value.
|
||||
has_non_synchronous = automation.has_non_synchronous_actions(
|
||||
conf.get(CONF_THEN, [])
|
||||
)
|
||||
|
||||
service_arg_names: list[str] = []
|
||||
for name, var_ in conf[CONF_VARIABLES].items():
|
||||
native = SERVICE_ARG_NATIVE_TYPES[var_]
|
||||
# Fall back to std::string for string args if non-synchronous actions exist
|
||||
if has_non_synchronous and native is cg.StringRef:
|
||||
native = cg.std_string
|
||||
if has_non_synchronous and var_ in SERVICE_ARG_FALLBACK_TYPES:
|
||||
native = SERVICE_ARG_FALLBACK_TYPES[var_]
|
||||
else:
|
||||
native = SERVICE_ARG_NATIVE_TYPES[var_]
|
||||
service_template_args.append(native)
|
||||
func_args.append((native, name))
|
||||
service_arg_names.append(name)
|
||||
|
||||
@@ -20,6 +20,7 @@ import contextlib
|
||||
from esphome.const import CONF_KEY, CONF_PORT, __version__
|
||||
from esphome.core import CORE
|
||||
from esphome.platformio_api import process_stacktrace
|
||||
from esphome.util import safe_print
|
||||
|
||||
from . import CONF_ENCRYPTION
|
||||
|
||||
@@ -60,7 +61,6 @@ async def async_run_logs(
|
||||
noise_psk=noise_psk,
|
||||
addresses=addresses, # Pass all addresses for automatic retry
|
||||
)
|
||||
dashboard = CORE.dashboard
|
||||
backtrace_state = False
|
||||
|
||||
# Try platform-specific stacktrace handler first, fall back to generic
|
||||
@@ -82,7 +82,11 @@ async def async_run_logs(
|
||||
f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
|
||||
)
|
||||
for parsed_msg in parse_log_message(text, timestamp):
|
||||
print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg)
|
||||
# safe_print handles the dashboard \033 escaping and falls back
|
||||
# to backslashreplace encoding on stdouts that can't represent
|
||||
# the wifi signal-bar block characters (Windows redirected
|
||||
# cp1252 pipe).
|
||||
safe_print(parsed_msg)
|
||||
for raw_line in text.splitlines():
|
||||
if platform_process_stacktrace:
|
||||
backtrace_state = platform_process_stacktrace(
|
||||
|
||||
@@ -183,19 +183,19 @@ async def at581x_settings_to_code(config, action_id, template_arg, args):
|
||||
cg.add(var.set_sensing_distance(template_))
|
||||
|
||||
if selfcheck := config.get(CONF_POWERON_SELFCHECK_TIME):
|
||||
template_ = await cg.templatable(selfcheck, args, cg.int32)
|
||||
template_ = await cg.templatable(selfcheck, args, cg.int_)
|
||||
cg.add(var.set_poweron_selfcheck_time(template_))
|
||||
|
||||
if protect := config.get(CONF_PROTECT_TIME):
|
||||
template_ = await cg.templatable(protect, args, cg.int32)
|
||||
template_ = await cg.templatable(protect, args, cg.int_)
|
||||
cg.add(var.set_protect_time(template_))
|
||||
|
||||
if trig_base := config.get(CONF_TRIGGER_BASE):
|
||||
template_ = await cg.templatable(trig_base, args, cg.int32)
|
||||
template_ = await cg.templatable(trig_base, args, cg.int_)
|
||||
cg.add(var.set_trigger_base(template_))
|
||||
|
||||
if trig_keep := config.get(CONF_TRIGGER_KEEP):
|
||||
template_ = await cg.templatable(trig_keep, args, cg.int32)
|
||||
template_ = await cg.templatable(trig_keep, args, cg.int_)
|
||||
cg.add(var.set_trigger_keep(template_))
|
||||
|
||||
if (stage_gain := config.get(CONF_STAGE_GAIN)) is not None:
|
||||
|
||||
@@ -413,7 +413,7 @@ async def deep_sleep_enter_to_code(config, action_id, template_arg, args):
|
||||
paren = await cg.get_variable(config[CONF_ID])
|
||||
var = cg.new_Pvariable(action_id, template_arg, paren)
|
||||
if CONF_SLEEP_DURATION in config:
|
||||
template_ = await cg.templatable(config[CONF_SLEEP_DURATION], args, cg.int32)
|
||||
template_ = await cg.templatable(config[CONF_SLEEP_DURATION], args, cg.uint32)
|
||||
cg.add(var.set_sleep_duration(template_))
|
||||
|
||||
if CONF_UNTIL in config:
|
||||
|
||||
@@ -128,23 +128,30 @@ ASSERTION_LEVELS = {
|
||||
SIGNING_SCHEMES = {
|
||||
"rsa3072": "CONFIG_SECURE_SIGNED_APPS_RSA_SCHEME",
|
||||
"ecdsa256": "CONFIG_SECURE_SIGNED_APPS_ECDSA_V2_SCHEME",
|
||||
"ecdsa_v1": "CONFIG_SECURE_SIGNED_APPS_ECDSA_SCHEME",
|
||||
}
|
||||
|
||||
# Chip variants that only support one signing scheme for Secure Boot V2.
|
||||
# Chip variants that only support one V2 signing scheme.
|
||||
# Based on SOC_SECURE_BOOT_V2_RSA / SOC_SECURE_BOOT_V2_ECC in soc_caps.h.
|
||||
# Variants not listed in either set support both RSA and ECDSA
|
||||
# Variants not listed in either set support both RSA and ECDSA V2
|
||||
# (e.g. C5, C6, H2, P4). New variants should be added to the
|
||||
# appropriate set if they only support one scheme.
|
||||
SIGNED_OTA_RSA_ONLY_VARIANTS = {
|
||||
VARIANT_ESP32,
|
||||
# Note: VARIANT_ESP32 is not listed here because it supports V2 RSA only
|
||||
# when minimum_chip_revision >= 3.0, which requires special handling.
|
||||
SIGNED_OTA_V2_RSA_ONLY_VARIANTS = {
|
||||
VARIANT_ESP32S2,
|
||||
VARIANT_ESP32S3,
|
||||
VARIANT_ESP32C3,
|
||||
}
|
||||
SIGNED_OTA_ECC_ONLY_VARIANTS = {
|
||||
SIGNED_OTA_V2_ECC_ONLY_VARIANTS = {
|
||||
VARIANT_ESP32C2,
|
||||
VARIANT_ESP32C61,
|
||||
}
|
||||
# V1 ECDSA (Secure Boot V1) is only supported on the original ESP32.
|
||||
# Based on SOC_SECURE_BOOT_V1 in soc_caps.h.
|
||||
SIGNED_OTA_V1_ECDSA_VARIANTS = {
|
||||
VARIANT_ESP32,
|
||||
}
|
||||
|
||||
COMPILER_OPTIMIZATIONS = {
|
||||
"DEBUG": "CONFIG_COMPILER_OPTIMIZATION_DEBUG",
|
||||
@@ -991,25 +998,73 @@ def final_validate(config):
|
||||
if signed_ota := advanced.get(CONF_SIGNED_OTA_VERIFICATION):
|
||||
scheme = signed_ota[CONF_SIGNING_SCHEME]
|
||||
variant = config[CONF_VARIANT]
|
||||
scheme_variant_conflicts = {
|
||||
"ecdsa256": (SIGNED_OTA_RSA_ONLY_VARIANTS, "rsa3072"),
|
||||
"rsa3072": (SIGNED_OTA_ECC_ONLY_VARIANTS, "ecdsa256"),
|
||||
}
|
||||
if (conflict := scheme_variant_conflicts.get(scheme)) and variant in conflict[
|
||||
0
|
||||
]:
|
||||
min_rev = advanced.get(CONF_MINIMUM_CHIP_REVISION)
|
||||
scheme_path = [
|
||||
CONF_FRAMEWORK,
|
||||
CONF_ADVANCED,
|
||||
CONF_SIGNED_OTA_VERIFICATION,
|
||||
CONF_SIGNING_SCHEME,
|
||||
]
|
||||
|
||||
# V1 ECDSA is only available on the original ESP32
|
||||
if scheme == "ecdsa_v1" and variant not in SIGNED_OTA_V1_ECDSA_VARIANTS:
|
||||
errs.append(
|
||||
cv.Invalid(
|
||||
f"Signing scheme '{scheme}' is not supported on "
|
||||
f"{VARIANT_FRIENDLY[variant]}. Use '{conflict[1]}' instead.",
|
||||
path=[
|
||||
CONF_FRAMEWORK,
|
||||
CONF_ADVANCED,
|
||||
CONF_SIGNED_OTA_VERIFICATION,
|
||||
CONF_SIGNING_SCHEME,
|
||||
],
|
||||
f"Signing scheme 'ecdsa_v1' is only supported on "
|
||||
f"{VARIANT_FRIENDLY[VARIANT_ESP32]}. "
|
||||
f"Use 'rsa3072' or 'ecdsa256' instead.",
|
||||
path=scheme_path,
|
||||
)
|
||||
)
|
||||
elif variant == VARIANT_ESP32:
|
||||
# On ESP32, V2 RSA requires minimum_chip_revision >= 3.0
|
||||
# Note: string comparison works here because cv.one_of constrains
|
||||
# min_rev to known ESP32_CHIP_REVISIONS values ("0.0".."3.1").
|
||||
if scheme == "rsa3072" and (min_rev is None or min_rev < "3.0"):
|
||||
errs.append(
|
||||
cv.Invalid(
|
||||
f"Signing scheme 'rsa3072' on {VARIANT_FRIENDLY[variant]} "
|
||||
f"requires minimum_chip_revision: '3.0' or higher "
|
||||
f"(Secure Boot V2 RSA needs chip revision 3.0+). "
|
||||
f"For older chip revisions, use 'ecdsa_v1' instead.",
|
||||
path=scheme_path,
|
||||
)
|
||||
)
|
||||
# ESP32 does not support V2 ECDSA (no SOC_SECURE_BOOT_V2_ECC)
|
||||
elif scheme == "ecdsa256":
|
||||
errs.append(
|
||||
cv.Invalid(
|
||||
f"Signing scheme 'ecdsa256' is not supported on "
|
||||
f"{VARIANT_FRIENDLY[variant]}. Use 'rsa3072' (with "
|
||||
f"minimum_chip_revision: '3.0') or 'ecdsa_v1' instead.",
|
||||
path=scheme_path,
|
||||
)
|
||||
)
|
||||
# V1 on rev 3.0+ -- suggest V2 RSA for stronger security
|
||||
elif scheme == "ecdsa_v1" and min_rev is not None and min_rev >= "3.0":
|
||||
_LOGGER.info(
|
||||
"Using Secure Boot V1 ECDSA on %s rev %s. "
|
||||
"Consider using 'rsa3072' (Secure Boot V2 RSA) for "
|
||||
"stronger security on chip revision 3.0+.",
|
||||
VARIANT_FRIENDLY[variant],
|
||||
min_rev,
|
||||
)
|
||||
else:
|
||||
# Non-ESP32 variants: check V2 scheme-variant compatibility
|
||||
scheme_variant_conflicts = {
|
||||
"ecdsa256": (SIGNED_OTA_V2_RSA_ONLY_VARIANTS, "rsa3072"),
|
||||
"rsa3072": (SIGNED_OTA_V2_ECC_ONLY_VARIANTS, "ecdsa256"),
|
||||
}
|
||||
if (
|
||||
conflict := scheme_variant_conflicts.get(scheme)
|
||||
) and variant in conflict[0]:
|
||||
errs.append(
|
||||
cv.Invalid(
|
||||
f"Signing scheme '{scheme}' is not supported on "
|
||||
f"{VARIANT_FRIENDLY[variant]}. Use '{conflict[1]}' instead.",
|
||||
path=scheme_path,
|
||||
)
|
||||
)
|
||||
if CONF_OTA not in full_config:
|
||||
_LOGGER.warning(
|
||||
"Signed OTA verification is enabled but no OTA component is configured. "
|
||||
@@ -1685,7 +1740,17 @@ async def to_code(config):
|
||||
|
||||
# Wrap FILE*-based printf functions to eliminate newlib's _vfprintf_r
|
||||
# (~11 KB). See printf_stubs.cpp for implementation.
|
||||
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF]:
|
||||
#
|
||||
# The wrap is only beneficial against newlib. Picolibc's tinystdio
|
||||
# implements vsnprintf by building a string-output FILE and calling
|
||||
# vfprintf, so vfprintf is unconditionally linked in by any caller
|
||||
# of snprintf/vsnprintf — effectively every build — and the wrap
|
||||
# saves nothing while costing ~170 B of shim. IDF 5.x defaults to
|
||||
# newlib on every variant; IDF 6.0+ switches to picolibc on every
|
||||
# variant.
|
||||
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF] or idf_version() >= cv.Version(
|
||||
6, 0, 0
|
||||
):
|
||||
cg.add_define("USE_FULL_PRINTF")
|
||||
else:
|
||||
for symbol in ("vprintf", "printf", "fprintf", "vfprintf"):
|
||||
|
||||
@@ -5,6 +5,7 @@ import json # noqa: E402
|
||||
import os # noqa: E402
|
||||
import pathlib # noqa: E402
|
||||
import shutil # noqa: E402
|
||||
import subprocess # noqa: E402
|
||||
from glob import glob # noqa: E402
|
||||
|
||||
|
||||
@@ -25,6 +26,114 @@ def _parse_sdkconfig(sdkconfig_path):
|
||||
return options
|
||||
|
||||
|
||||
def _generate_v1_verification_key(env):
|
||||
"""Generate the V1 ECDSA verification key binary and assembly source file.
|
||||
|
||||
Secure Boot V1 embeds the public verification key directly in the app binary
|
||||
as a compiled object (via a .S assembly file). The ESP-IDF CMake build generates
|
||||
these files via custom commands, but PlatformIO's SCons bridge does not execute
|
||||
them. This function replicates that logic:
|
||||
1. Extracts the raw public key from the PEM signing key using espsecure.
|
||||
2. Generates the .S assembly source that embeds the key bytes.
|
||||
"""
|
||||
build_dir = pathlib.Path(env.subst("$BUILD_DIR"))
|
||||
project_dir = pathlib.Path(env.subst("$PROJECT_DIR"))
|
||||
pioenv = env.subst("$PIOENV")
|
||||
sdkconfig = _parse_sdkconfig(project_dir / f"sdkconfig.{pioenv}")
|
||||
|
||||
if sdkconfig.get("CONFIG_SECURE_SIGNED_APPS_ECDSA_SCHEME") != "y":
|
||||
return
|
||||
|
||||
bin_path = build_dir / "signature_verification_key.bin"
|
||||
asm_path = build_dir / "signature_verification_key.bin.S"
|
||||
|
||||
# Determine the source of the verification key
|
||||
if sdkconfig.get("CONFIG_SECURE_BOOT_BUILD_SIGNED_BINARIES") == "y":
|
||||
# Extract public key from the signing key
|
||||
signing_key = sdkconfig.get("CONFIG_SECURE_BOOT_SIGNING_KEY")
|
||||
if not signing_key:
|
||||
return
|
||||
signing_key_path = pathlib.Path(signing_key)
|
||||
if not signing_key_path.exists():
|
||||
print(f"Error: V1 ECDSA signing key not found: {signing_key_path}")
|
||||
env.Exit(1)
|
||||
return
|
||||
|
||||
if not bin_path.exists() or bin_path.stat().st_mtime < signing_key_path.stat().st_mtime:
|
||||
python_exe = env.subst("$PYTHONEXE")
|
||||
result = subprocess.run(
|
||||
[python_exe, "-m", "espsecure", "extract_public_key",
|
||||
"--keyfile", str(signing_key_path), str(bin_path)],
|
||||
capture_output=True, text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
print(f"Error extracting V1 verification key: {result.stderr}")
|
||||
env.Exit(1)
|
||||
return
|
||||
print(f"Extracted V1 ECDSA verification key from {signing_key_path.name}")
|
||||
else:
|
||||
# User-provided verification key -- should already be a raw binary file
|
||||
verification_key = sdkconfig.get("CONFIG_SECURE_BOOT_VERIFICATION_KEY")
|
||||
if not verification_key:
|
||||
return
|
||||
verification_key_path = pathlib.Path(verification_key)
|
||||
if not verification_key_path.exists():
|
||||
print(f"Error: Verification key not found: {verification_key_path}")
|
||||
env.Exit(1)
|
||||
return
|
||||
shutil.copyfile(str(verification_key_path), str(bin_path))
|
||||
|
||||
if not bin_path.exists():
|
||||
return
|
||||
|
||||
# Generate the .S assembly file from the binary key data.
|
||||
# Replicates ESP-IDF's data_file_embed_asm.cmake with RENAME_TO=signature_verification_key_bin.
|
||||
# The file is needed in both the app build dir and the bootloader build dir, since
|
||||
# the bootloader also embeds the verification key when CONFIG_SECURE_SIGNED_ON_BOOT_NO_SECURE_BOOT
|
||||
# is enabled. PlatformIO's SCons bridge does not execute the CMake custom commands that
|
||||
# normally generate these files.
|
||||
data = bin_path.read_bytes()
|
||||
varname = "signature_verification_key_bin"
|
||||
|
||||
lines = []
|
||||
lines.append(f"/* Data converted from {bin_path.name} */")
|
||||
lines.append(".data")
|
||||
lines.append("#if !defined (__APPLE__) && !defined (__linux__)")
|
||||
lines.append(".section .rodata.embedded")
|
||||
lines.append("#endif")
|
||||
lines.append(f"\n.global {varname}")
|
||||
lines.append(f"{varname}:")
|
||||
lines.append(f"\n.global _binary_{varname}_start")
|
||||
lines.append(f"_binary_{varname}_start: /* for objcopy compatibility */")
|
||||
|
||||
# Format binary data as .byte lines (16 bytes per line)
|
||||
for i in range(0, len(data), 16):
|
||||
chunk = data[i:i + 16]
|
||||
hex_bytes = ", ".join(f"0x{b:02x}" for b in chunk)
|
||||
lines.append(f".byte {hex_bytes}")
|
||||
|
||||
lines.append(f"\n.global _binary_{varname}_end")
|
||||
lines.append(f"_binary_{varname}_end: /* for objcopy compatibility */")
|
||||
lines.append(f"\n.global {varname}_length")
|
||||
lines.append(f"{varname}_length:")
|
||||
lines.append(f".long {len(data)}")
|
||||
lines.append("")
|
||||
lines.append('#if defined (__linux__)')
|
||||
lines.append('.section .note.GNU-stack,"",@progbits')
|
||||
lines.append("#endif")
|
||||
|
||||
asm_content = "\n".join(lines) + "\n"
|
||||
|
||||
# Write to app build dir and bootloader build dir
|
||||
asm_path.write_text(asm_content)
|
||||
bootloader_dir = build_dir / "bootloader"
|
||||
if bootloader_dir.is_dir():
|
||||
bootloader_bin = bootloader_dir / "signature_verification_key.bin"
|
||||
bootloader_asm = bootloader_dir / "signature_verification_key.bin.S"
|
||||
shutil.copyfile(str(bin_path), str(bootloader_bin))
|
||||
bootloader_asm.write_text(asm_content)
|
||||
|
||||
|
||||
def sign_firmware(source, target, env):
|
||||
"""
|
||||
Sign the firmware binary using espsecure.py if signed OTA verification is enabled.
|
||||
@@ -55,9 +164,12 @@ def sign_firmware(source, target, env):
|
||||
env.Exit(1)
|
||||
return
|
||||
|
||||
# ESPHome only exposes RSA3072 and ECDSA256 (both Secure Boot V2 schemes),
|
||||
# so the espsecure signature version is always 2.
|
||||
sign_version = "2"
|
||||
# Determine espsecure signature version from the signing scheme:
|
||||
# V1 ECDSA (Secure Boot V1) uses --version 1, V2 RSA/ECDSA use --version 2.
|
||||
if sdkconfig.get("CONFIG_SECURE_SIGNED_APPS_ECDSA_SCHEME") == "y":
|
||||
sign_version = "1"
|
||||
else:
|
||||
sign_version = "2"
|
||||
|
||||
firmware_name = os.path.basename(env.subst("$PROGNAME")) + ".bin"
|
||||
firmware_path = build_dir / firmware_name
|
||||
@@ -217,6 +329,11 @@ def esp32_copy_ota_bin(source, target, env):
|
||||
print(f"Copied firmware to {new_file_name}")
|
||||
|
||||
|
||||
# Generate V1 ECDSA verification key files before build starts.
|
||||
# Workaround for PlatformIO not executing CMake custom commands that extract
|
||||
# the public key and generate the .S assembly file for Secure Boot V1.
|
||||
_generate_v1_verification_key(env) # noqa: F821
|
||||
|
||||
# Run signing first, then merge, then ota copy
|
||||
env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", sign_firmware) # noqa: F821
|
||||
env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", merge_factory_bin) # noqa: F821
|
||||
|
||||
@@ -22,6 +22,12 @@ struct NVSData {
|
||||
|
||||
static std::vector<NVSData> s_pending_save; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
||||
// open() runs from app_main() before the logger is initialized, so any failure
|
||||
// must be deferred until after global_logger is set. This is emitted from the
|
||||
// first make_preference() call, which runs from the generated setup() after
|
||||
// log->pre_setup() has run at EARLY_INIT priority.
|
||||
static esp_err_t s_open_err = ESP_OK; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
||||
bool ESP32PreferenceBackend::save(const uint8_t *data, size_t len) {
|
||||
// try find in pending saves and update that
|
||||
for (auto &obj : s_pending_save) {
|
||||
@@ -74,12 +80,14 @@ bool ESP32PreferenceBackend::load(uint8_t *data, size_t len) {
|
||||
}
|
||||
|
||||
void ESP32Preferences::open() {
|
||||
// Runs from app_main() before the logger is initialized; any logging here
|
||||
// must be deferred. See s_open_err and make_preference() below.
|
||||
nvs_flash_init();
|
||||
esp_err_t err = nvs_open("esphome", NVS_READWRITE, &this->nvs_handle);
|
||||
if (err == 0)
|
||||
return;
|
||||
|
||||
ESP_LOGW(TAG, "nvs_open failed: %s - erasing NVS", esp_err_to_name(err));
|
||||
s_open_err = err;
|
||||
nvs_flash_deinit();
|
||||
nvs_flash_erase();
|
||||
nvs_flash_init();
|
||||
@@ -91,6 +99,14 @@ void ESP32Preferences::open() {
|
||||
}
|
||||
|
||||
ESPPreferenceObject ESP32Preferences::make_preference(size_t length, uint32_t type) {
|
||||
if (s_open_err != ESP_OK) {
|
||||
if (this->nvs_handle == 0) {
|
||||
ESP_LOGW(TAG, "nvs_open failed: %s - NVS unavailable", esp_err_to_name(s_open_err));
|
||||
} else {
|
||||
ESP_LOGW(TAG, "nvs_open failed: %s - erased NVS", esp_err_to_name(s_open_err));
|
||||
}
|
||||
s_open_err = ESP_OK;
|
||||
}
|
||||
auto *pref = new ESP32PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
|
||||
pref->nvs_handle = this->nvs_handle;
|
||||
pref->key = type;
|
||||
|
||||
@@ -1,32 +1,38 @@
|
||||
/*
|
||||
* Linker wrap stubs for FILE*-based printf functions.
|
||||
* Linker wrap stubs for FILE*-based printf functions (newlib only).
|
||||
*
|
||||
* ESP-IDF SDK components (gpio driver, ringbuf, log_write) reference
|
||||
* fprintf(), printf(), vprintf(), and vfprintf() which pull in the full
|
||||
* printf implementation (~11 KB on newlib's _vfprintf_r, ~2.8 KB on
|
||||
* picolibc's vfprintf). This is a separate implementation from the one
|
||||
* used by snprintf/vsnprintf that handles FILE* stream I/O with buffering
|
||||
* and locking.
|
||||
* fprintf(), printf(), vprintf(), and vfprintf(), which on newlib pull
|
||||
* in _vfprintf_r (~11 KB) — a separate implementation from the one used
|
||||
* by snprintf/vsnprintf that handles FILE* stream I/O with buffering.
|
||||
*
|
||||
* ESPHome replaces the ESP-IDF log handler via esp_log_set_vprintf_(),
|
||||
* so the SDK's vprintf() path is dead code at runtime. The fprintf()
|
||||
* and printf() calls in SDK components are only in debug/assert paths
|
||||
* (gpio_dump_io_configuration, ringbuf diagnostics) that are either
|
||||
* GC'd or never called. Crash backtraces and panic output are
|
||||
* unaffected — they use esp_rom_printf() which is a ROM function
|
||||
* and does not go through libc.
|
||||
* unaffected; they use esp_rom_printf() which is a ROM function and
|
||||
* does not go through libc.
|
||||
*
|
||||
* These stubs redirect through vsnprintf() (which uses _svfprintf_r
|
||||
* already in the binary) and fwrite(), allowing the linker to
|
||||
* dead-code eliminate _vfprintf_r.
|
||||
* This wrap is newlib-only. On picolibc, vsnprintf is implemented as
|
||||
* vfprintf into a string-output FILE, so vfprintf is unconditionally
|
||||
* linked in by any caller of snprintf/vsnprintf and the wrap can never
|
||||
* elide it — it just adds shim cost. Codegen forces USE_FULL_PRINTF
|
||||
* on picolibc builds (IDF 6.0+ on all variants) so this file compiles
|
||||
* to nothing there; the #error below catches a desynchronised gate.
|
||||
*
|
||||
* Saves ~11 KB of flash.
|
||||
* Saves ~11 KB of flash on newlib.
|
||||
*
|
||||
* To disable these wraps, set enable_full_printf: true in the esp32
|
||||
* advanced config section.
|
||||
* To disable this wrap on newlib, set enable_full_printf: true in the
|
||||
* esp32 advanced config section.
|
||||
*/
|
||||
|
||||
#if defined(USE_ESP_IDF) && !defined(USE_FULL_PRINTF)
|
||||
|
||||
#ifdef __PICOLIBC__
|
||||
#error "printf wrap is net-negative on picolibc; codegen should set USE_FULL_PRINTF"
|
||||
#endif
|
||||
|
||||
#include <cstdarg>
|
||||
#include <cstdio>
|
||||
|
||||
@@ -34,6 +40,9 @@
|
||||
|
||||
namespace esphome::esp32 {}
|
||||
|
||||
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
|
||||
extern "C" {
|
||||
|
||||
static constexpr size_t PRINTF_BUFFER_SIZE = 512;
|
||||
|
||||
// These stubs are essentially dead code at runtime — ESPHome replaces the
|
||||
@@ -55,14 +64,16 @@ static int write_printf_buffer(FILE *stream, char *buf, int len) {
|
||||
return len;
|
||||
}
|
||||
|
||||
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
|
||||
extern "C" {
|
||||
|
||||
int __wrap_vprintf(const char *fmt, va_list ap) {
|
||||
char buf[PRINTF_BUFFER_SIZE];
|
||||
return write_printf_buffer(stdout, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
|
||||
}
|
||||
|
||||
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
|
||||
char buf[PRINTF_BUFFER_SIZE];
|
||||
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
|
||||
}
|
||||
|
||||
int __wrap_printf(const char *fmt, ...) {
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
@@ -71,11 +82,6 @@ int __wrap_printf(const char *fmt, ...) {
|
||||
return len;
|
||||
}
|
||||
|
||||
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
|
||||
char buf[PRINTF_BUFFER_SIZE];
|
||||
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
|
||||
}
|
||||
|
||||
int __wrap_fprintf(FILE *stream, const char *fmt, ...) {
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
|
||||
@@ -216,6 +216,7 @@ void ESP32TouchComponent::setup() {
|
||||
// Do initial oneshot scans to populate baseline values
|
||||
for (uint32_t i = 0; i < ONESHOT_SCAN_COUNT; i++) {
|
||||
err = touch_sensor_trigger_oneshot_scanning(this->sens_handle_, ONESHOT_SCAN_TIMEOUT_MS);
|
||||
App.feed_wdt(); // 3 scans with 2s timeout might exceed WDT, so feed it here to be safe
|
||||
if (err != ESP_OK) {
|
||||
ESP_LOGW(TAG, "Oneshot scan %" PRIu32 " failed: %s", i, esp_err_to_name(err));
|
||||
}
|
||||
|
||||
@@ -744,21 +744,28 @@ async def write_image(config, all_frames=False):
|
||||
if frame_count <= 1:
|
||||
_LOGGER.warning("Image file %s has no animation frames", path)
|
||||
|
||||
total_rows = height * frame_count
|
||||
encoder = IMAGE_TYPE[type](width, total_rows, transparency, dither, invert_alpha)
|
||||
if byte_order := config.get(CONF_BYTE_ORDER):
|
||||
# Check for valid type has already been done in validate_settings
|
||||
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
|
||||
# Encode each frame with its own encoder and concatenate. This keeps every
|
||||
# frame self-contained on disk (e.g. RGB565+alpha emits [RGB plane | alpha plane]
|
||||
# per frame) so animation frame stepping in image.cpp / animation.cpp stays
|
||||
# correct without needing to know the total frame count.
|
||||
byte_order = config.get(CONF_BYTE_ORDER)
|
||||
combined_data: list[int] = []
|
||||
encoder: ImageEncoder | None = None
|
||||
for frame_index in range(frame_count):
|
||||
image.seek(frame_index)
|
||||
encoder = IMAGE_TYPE[type](width, height, transparency, dither, invert_alpha)
|
||||
if byte_order is not None:
|
||||
# Check for valid type has already been done in validate_settings
|
||||
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
|
||||
pixels = encoder.convert(image.resize((width, height)), path).getdata()
|
||||
for row in range(height):
|
||||
for col in range(width):
|
||||
encoder.encode(pixels[row * width + col])
|
||||
encoder.end_row()
|
||||
encoder.end_image()
|
||||
combined_data.extend(encoder.data)
|
||||
|
||||
rhs = [HexInt(x) for x in encoder.data]
|
||||
rhs = [HexInt(x) for x in combined_data]
|
||||
prog_arr = cg.progmem_array(config[CONF_RAW_DATA_ID], rhs)
|
||||
image_type = get_image_type_enum(type)
|
||||
trans_value = get_transparency_enum(encoder.transparency)
|
||||
|
||||
@@ -766,32 +766,38 @@ void LD2412Component::get_distance_resolution_() { this->send_command_(CMD_QUERY
|
||||
void LD2412Component::query_light_control_() { this->send_command_(CMD_QUERY_LIGHT_CONTROL, nullptr, 0); }
|
||||
|
||||
void LD2412Component::set_basic_config() {
|
||||
uint8_t min_gate = 1;
|
||||
uint8_t max_gate = TOTAL_GATES;
|
||||
uint16_t timeout = DEFAULT_PRESENCE_TIMEOUT;
|
||||
uint8_t out_pin_level = 0x01;
|
||||
|
||||
#ifdef USE_NUMBER
|
||||
if (!this->min_distance_gate_number_->has_state() || !this->max_distance_gate_number_->has_state() ||
|
||||
!this->timeout_number_->has_state()) {
|
||||
return;
|
||||
if (this->min_distance_gate_number_ != nullptr) {
|
||||
if (!this->min_distance_gate_number_->has_state())
|
||||
return;
|
||||
min_gate = static_cast<int>(this->min_distance_gate_number_->state);
|
||||
}
|
||||
if (this->max_distance_gate_number_ != nullptr) {
|
||||
if (!this->max_distance_gate_number_->has_state())
|
||||
return;
|
||||
max_gate = static_cast<int>(this->max_distance_gate_number_->state) + 1;
|
||||
}
|
||||
if (this->timeout_number_ != nullptr) {
|
||||
if (!this->timeout_number_->has_state())
|
||||
return;
|
||||
timeout = static_cast<int>(this->timeout_number_->state);
|
||||
}
|
||||
#endif
|
||||
#ifdef USE_SELECT
|
||||
if (!this->out_pin_level_select_->has_state()) {
|
||||
return;
|
||||
if (this->out_pin_level_select_ != nullptr) {
|
||||
if (!this->out_pin_level_select_->has_state())
|
||||
return;
|
||||
out_pin_level = find_uint8(OUT_PIN_LEVELS_BY_STR, this->out_pin_level_select_->current_option().c_str());
|
||||
}
|
||||
#endif
|
||||
|
||||
uint8_t value[5] = {
|
||||
#ifdef USE_NUMBER
|
||||
lowbyte(static_cast<int>(this->min_distance_gate_number_->state)),
|
||||
lowbyte(static_cast<int>(this->max_distance_gate_number_->state) + 1),
|
||||
lowbyte(static_cast<int>(this->timeout_number_->state)),
|
||||
highbyte(static_cast<int>(this->timeout_number_->state)),
|
||||
#else
|
||||
1, TOTAL_GATES, DEFAULT_PRESENCE_TIMEOUT, 0,
|
||||
#endif
|
||||
#ifdef USE_SELECT
|
||||
find_uint8(OUT_PIN_LEVELS_BY_STR, this->out_pin_level_select_->current_option().c_str()),
|
||||
#else
|
||||
0x01, // Default value if not using select
|
||||
#endif
|
||||
lowbyte(min_gate), lowbyte(max_gate), lowbyte(timeout), highbyte(timeout), out_pin_level,
|
||||
};
|
||||
this->set_config_mode_(true);
|
||||
this->send_command_(CMD_BASIC_CONF, value, sizeof(value));
|
||||
|
||||
@@ -89,10 +89,12 @@
|
||||
id: hello_world_label_
|
||||
text: "Hello World!"
|
||||
align: center
|
||||
- obj:
|
||||
- container:
|
||||
id: hello_world_qrcode_
|
||||
outline_width: 0
|
||||
border_width: 0
|
||||
height: 100
|
||||
width: 100
|
||||
hidden: !lambda |-
|
||||
return lv_obj_get_width(lv_screen_active()) < 300 && lv_obj_get_height(lv_screen_active()) < 400;
|
||||
widgets:
|
||||
|
||||
@@ -454,10 +454,12 @@ void LVTouchListener::update(const touchscreen::TouchPoints_t &tpoints) {
|
||||
|
||||
#ifdef USE_LVGL_METER
|
||||
|
||||
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value) {
|
||||
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value) {
|
||||
auto *scale = lv_obj_get_parent(obj);
|
||||
auto min_value = lv_scale_get_range_min_value(scale);
|
||||
return ((value - min_value) * lv_scale_get_angle_range(scale) / (lv_scale_get_range_max_value(scale) - min_value) +
|
||||
auto max_value = lv_scale_get_range_max_value(scale);
|
||||
value = clamp(value, min_value, max_value);
|
||||
return ((value - min_value) * lv_scale_get_angle_range(scale) / (max_value - min_value) +
|
||||
lv_scale_get_rotation((scale))) %
|
||||
360;
|
||||
}
|
||||
|
||||
@@ -88,6 +88,12 @@ inline void lv_obj_set_style_bitmap_mask_src(lv_obj_t *obj, image::Image *image,
|
||||
inline void lv_obj_set_style_bg_image_src(lv_obj_t *obj, image::Image *image, lv_style_selector_t selector) {
|
||||
::lv_obj_set_style_bg_image_src(obj, image->get_lv_image_dsc(), selector);
|
||||
}
|
||||
inline void lv_style_set_bg_image_src(lv_style_t *style, image::Image *image) {
|
||||
::lv_style_set_bg_image_src(style, image->get_lv_image_dsc());
|
||||
}
|
||||
inline void lv_style_set_bitmap_mask_src(lv_style_t *style, image::Image *image) {
|
||||
::lv_style_set_bitmap_mask_src(style, image->get_lv_image_dsc());
|
||||
}
|
||||
#endif // USE_LVGL_IMAGE
|
||||
#ifdef USE_LVGL_ANIMIMG
|
||||
inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images) {
|
||||
@@ -106,7 +112,7 @@ inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images
|
||||
#endif // USE_LVGL_ANIMIMG
|
||||
|
||||
#ifdef USE_LVGL_METER
|
||||
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value);
|
||||
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value);
|
||||
#endif
|
||||
|
||||
// Parent class for things that wrap an LVGL object
|
||||
|
||||
@@ -52,19 +52,23 @@ class KeyboardType(WidgetType):
|
||||
if mode := config.get(CONF_MODE):
|
||||
await w.set_property(CONF_MODE, await KEYBOARD_MODES.process(mode))
|
||||
if textarea := config.get(CONF_TEXTAREA):
|
||||
# If a textarea is configured, it must be generated before the keyboard can attach it.
|
||||
# If not yet configured, defer the attachment code.
|
||||
if not is_widget_completed(textarea):
|
||||
# Can only happen for an initial config, where the keyboard is configured before the
|
||||
# textarea, so it's ok to always emit into the global context
|
||||
async def add_textarea():
|
||||
async with LvContext():
|
||||
await w.set_property(
|
||||
CONF_TEXTAREA,
|
||||
(await get_widgets(config, CONF_TEXTAREA))[0].obj,
|
||||
)
|
||||
|
||||
async def add_textarea():
|
||||
async with LvContext():
|
||||
await w.set_property(
|
||||
CONF_TEXTAREA, (await get_widgets(config, CONF_TEXTAREA))[0].obj
|
||||
)
|
||||
|
||||
if is_widget_completed(textarea):
|
||||
await add_textarea()
|
||||
else:
|
||||
CORE.add_job(add_textarea)
|
||||
else:
|
||||
# Handles updates in automations, and properly ordered initial config. Code is generated
|
||||
# into the enclosing context (main or lambda)
|
||||
await w.set_property(
|
||||
CONF_TEXTAREA, (await get_widgets(config, CONF_TEXTAREA))[0].obj
|
||||
)
|
||||
|
||||
|
||||
keyboard_spec = KeyboardType()
|
||||
|
||||
@@ -22,7 +22,7 @@ from ..defines import (
|
||||
literal,
|
||||
)
|
||||
from ..lv_validation import animated, lv_int, size
|
||||
from ..lvcode import LocalVariable, lv, lv_assign, lv_expr, lv_obj
|
||||
from ..lvcode import LocalVariable, lv, lv_assign, lv_expr, lv_obj, lv_Pvariable
|
||||
from ..schemas import container_schema, part_schema
|
||||
from ..types import LV_EVENT, LvType, ObjUpdateAction, lv_obj_t, lv_obj_t_ptr
|
||||
from . import Widget, WidgetType, add_widgets, get_widgets, set_obj_properties
|
||||
@@ -83,8 +83,8 @@ class TabviewType(WidgetType):
|
||||
await w.set_property("tab_bar_size", await size.process(config[CONF_SIZE]))
|
||||
for tab_conf in config[CONF_TABS]:
|
||||
w_id = tab_conf[CONF_ID]
|
||||
tab_obj = cg.Pvariable(w_id, cg.nullptr, type_=lv_tab_t)
|
||||
tab_widget = Widget.create(w_id, tab_obj, obj_spec)
|
||||
tab_obj = lv_Pvariable(lv_tab_t, w_id)
|
||||
tab_widget = Widget.create(w_id, tab_obj, obj_spec, tab_conf)
|
||||
lv_assign(tab_obj, lv_expr.tabview_add_tab(w.obj, tab_conf[CONF_NAME]))
|
||||
await set_obj_properties(tab_widget, tab_conf)
|
||||
await add_widgets(tab_widget, tab_conf)
|
||||
|
||||
@@ -37,7 +37,10 @@ void IRAM_ATTR MCP23016::gpio_intr(MCP23016 *arg) { arg->enable_loop_soon_any_co
|
||||
void MCP23016::loop() {
|
||||
// Invalidate cache at the start of each loop
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ from esphome import pins
|
||||
import esphome.codegen as cg
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import (
|
||||
CONF_ALLOW_OTHER_USES,
|
||||
CONF_ID,
|
||||
CONF_INPUT,
|
||||
CONF_INTERRUPT,
|
||||
@@ -30,10 +31,29 @@ MCP23XXX_INTERRUPT_MODES = {
|
||||
"FALLING": MCP23XXXInterruptMode.MCP23XXX_FALLING,
|
||||
}
|
||||
|
||||
|
||||
def _validate_interrupt_pin(value):
|
||||
# The MCP component owns INT polarity (active-low, hardcoded falling-edge ISR)
|
||||
# and installs a single ISR per GPIO, so neither inversion nor sharing is supported.
|
||||
value = pins.internal_gpio_input_pin_schema(value)
|
||||
if value.get(CONF_INVERTED):
|
||||
raise cv.Invalid(
|
||||
f"'{CONF_INVERTED}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
|
||||
"the MCP23xxx INT line is fixed active-low"
|
||||
)
|
||||
if value.get(CONF_ALLOW_OTHER_USES):
|
||||
raise cv.Invalid(
|
||||
f"'{CONF_ALLOW_OTHER_USES}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
|
||||
"sharing the interrupt pin between multiple MCP23xxx (or other components) "
|
||||
"is not implemented. Remove the interrupt_pin to fall back to polling."
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
MCP23XXX_CONFIG_SCHEMA = cv.Schema(
|
||||
{
|
||||
cv.Optional(CONF_OPEN_DRAIN_INTERRUPT, default=False): cv.boolean,
|
||||
cv.Optional(CONF_INTERRUPT_PIN): pins.internal_gpio_input_pin_schema,
|
||||
cv.Optional(CONF_INTERRUPT_PIN): _validate_interrupt_pin,
|
||||
}
|
||||
).extend(cv.COMPONENT_SCHEMA)
|
||||
|
||||
|
||||
@@ -21,7 +21,10 @@ template<uint8_t N> class MCP23XXXBase : public Component, public gpio_expander:
|
||||
|
||||
void loop() override {
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,13 @@ namespace esphome::nextion {
|
||||
static const char *const TAG = "nextion.upload.arduino";
|
||||
static constexpr size_t NEXTION_MAX_RESPONSE_LOG_BYTES = 16;
|
||||
|
||||
// Timeout for display acknowledgment during TFT upload (ms).
|
||||
// A single value is used for all chunks; the happy path returns as soon as
|
||||
// 0x05/0x08 arrives, so this only bounds failed-detection latency. Field
|
||||
// reports showed the previous 500ms steady-state value was too tight for
|
||||
// some firmware variants.
|
||||
static constexpr uint32_t NEXTION_UPLOAD_ACK_TIMEOUT_MS = 5000;
|
||||
|
||||
// Followed guide
|
||||
// https://unofficialnextion.com/t/nextion-upload-protocol-v1-2-the-fast-one/1044/2
|
||||
|
||||
@@ -80,14 +87,14 @@ int Nextion::upload_by_chunks_(HTTPClient &http_client, uint32_t &range_start) {
|
||||
recv_string.clear();
|
||||
this->write_array(buffer, buffer_size);
|
||||
App.feed_wdt();
|
||||
this->recv_ret_string_(recv_string, this->upload_first_chunk_sent_ ? 500 : 5000, true);
|
||||
this->recv_ret_string_(recv_string, NEXTION_UPLOAD_ACK_TIMEOUT_MS, true);
|
||||
this->content_length_ -= read_len;
|
||||
const float upload_percentage = 100.0f * (this->tft_size_ - this->content_length_) / this->tft_size_;
|
||||
ESP_LOGD(TAG, "Upload: %0.2f%% (%" PRIu32 " left, heap: %" PRIu32 ")", upload_percentage, this->content_length_,
|
||||
EspClass::getFreeHeap());
|
||||
this->upload_first_chunk_sent_ = true;
|
||||
if (recv_string.empty()) {
|
||||
ESP_LOGW(TAG, "No response from display during upload");
|
||||
ESP_LOGW(TAG, "No response from display after %" PRIu32 "ms", NEXTION_UPLOAD_ACK_TIMEOUT_MS);
|
||||
allocator.deallocate(buffer, 4096);
|
||||
buffer = nullptr;
|
||||
return -1;
|
||||
|
||||
@@ -19,6 +19,13 @@ namespace esphome::nextion {
|
||||
static const char *const TAG = "nextion.upload.esp32";
|
||||
static constexpr size_t NEXTION_MAX_RESPONSE_LOG_BYTES = 16;
|
||||
|
||||
// Timeout for display acknowledgment during TFT upload (ms).
|
||||
// A single value is used for all chunks; the happy path returns as soon as
|
||||
// 0x05/0x08 arrives, so this only bounds failed-detection latency. Field
|
||||
// reports showed the previous 500ms steady-state value was too tight for
|
||||
// some firmware variants.
|
||||
static constexpr uint32_t NEXTION_UPLOAD_ACK_TIMEOUT_MS = 5000;
|
||||
|
||||
// Followed guide
|
||||
// https://unofficialnextion.com/t/nextion-upload-protocol-v1-2-the-fast-one/1044/2
|
||||
|
||||
@@ -96,7 +103,7 @@ int Nextion::upload_by_chunks_(esp_http_client_handle_t http_client, uint32_t &r
|
||||
recv_string.clear();
|
||||
this->write_array(buffer, buffer_size);
|
||||
App.feed_wdt();
|
||||
this->recv_ret_string_(recv_string, upload_first_chunk_sent_ ? 500 : 5000, true);
|
||||
this->recv_ret_string_(recv_string, NEXTION_UPLOAD_ACK_TIMEOUT_MS, true);
|
||||
this->content_length_ -= read_len;
|
||||
const float upload_percentage = 100.0f * (this->tft_size_ - this->content_length_) / this->tft_size_;
|
||||
#ifdef USE_PSRAM
|
||||
@@ -109,7 +116,7 @@ int Nextion::upload_by_chunks_(esp_http_client_handle_t http_client, uint32_t &r
|
||||
#endif
|
||||
upload_first_chunk_sent_ = true;
|
||||
if (recv_string.empty()) {
|
||||
ESP_LOGW(TAG, "No response from display during upload");
|
||||
ESP_LOGW(TAG, "No response from display after %" PRIu32 "ms", NEXTION_UPLOAD_ACK_TIMEOUT_MS);
|
||||
allocator.deallocate(buffer, 4096);
|
||||
buffer = nullptr;
|
||||
return -1;
|
||||
|
||||
@@ -62,7 +62,10 @@ void IRAM_ATTR PCA6416AComponent::gpio_intr(PCA6416AComponent *arg) { arg->enabl
|
||||
void PCA6416AComponent::loop() {
|
||||
// Invalidate cache at the start of each loop
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,8 +50,10 @@ void IRAM_ATTR PCA9554Component::gpio_intr(PCA9554Component *arg) { arg->enable_
|
||||
void PCA9554Component::loop() {
|
||||
// Invalidate the cache so the next digital_read() triggers a fresh I2C read
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Interrupt-driven: disable loop until next interrupt fires
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,8 +31,10 @@ void IRAM_ATTR PCF8574Component::gpio_intr(PCF8574Component *arg) { arg->enable_
|
||||
void PCF8574Component::loop() {
|
||||
// Invalidate the cache so the next digital_read() triggers a fresh I2C read
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Interrupt-driven: disable loop until next interrupt fires
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +82,10 @@ void PI4IOE5V6408Component::pin_mode(uint8_t pin, gpio::Flags flags) {
|
||||
|
||||
void PI4IOE5V6408Component::loop() {
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,6 +129,6 @@ async def to_code(config):
|
||||
async def sensor_template_publish_to_code(config, action_id, template_arg, args):
|
||||
paren = await cg.get_variable(config[CONF_ID])
|
||||
var = cg.new_Pvariable(action_id, template_arg, paren)
|
||||
template_ = await cg.templatable(config[CONF_VALUE], args, cg.int32)
|
||||
template_ = await cg.templatable(config[CONF_VALUE], args, cg.int_)
|
||||
cg.add(var.set_value(template_))
|
||||
return var
|
||||
|
||||
@@ -57,7 +57,10 @@ void TCA9555Component::pin_mode(uint8_t pin, gpio::Flags flags) {
|
||||
}
|
||||
void TCA9555Component::loop() {
|
||||
this->reset_pin_cache_();
|
||||
if (this->interrupt_pin_ != nullptr) {
|
||||
// Only disable the loop once INT has actually gone HIGH. Input transitions that straddle the
|
||||
// I2C read leave INT asserted without re-firing a falling edge, which would strand us with
|
||||
// stale state forever; keep looping until the line is released so we self-heal.
|
||||
if (this->interrupt_pin_ != nullptr && this->interrupt_pin_->digital_read()) {
|
||||
this->disable_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import errno
|
||||
from importlib import resources
|
||||
import logging
|
||||
|
||||
@@ -74,6 +75,12 @@ def _load_tzdata(iana_key: str) -> bytes | None:
|
||||
return (resources.files(package) / resource).read_bytes()
|
||||
except (FileNotFoundError, ModuleNotFoundError, IsADirectoryError):
|
||||
return None
|
||||
except OSError as e:
|
||||
# Windows raises EINVAL for paths with NTFS-illegal chars (e.g. '<'/'>'
|
||||
# in POSIX TZ strings like "<+08>-8" that validate_tz feeds back here).
|
||||
if e.errno == errno.EINVAL:
|
||||
return None
|
||||
raise
|
||||
|
||||
|
||||
def _extract_tz_string(tzfile: bytes) -> str:
|
||||
|
||||
@@ -116,12 +116,23 @@ CONFIG_SCHEMA = cv.ensure_list(
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
# The output chunk pool/queue are compile-time-sized templates shared by all
|
||||
# USBUartChannel instances, so use the largest buffer_size across every channel
|
||||
# of every device. Each chunk is 64 bytes (USB FS MPS); add one extra slot
|
||||
# because LockFreeQueue<T,N> is a ring buffer that wastes one entry.
|
||||
max_buffer_size = max(
|
||||
channel[CONF_BUFFER_SIZE]
|
||||
for device in config
|
||||
for channel in device[CONF_CHANNELS]
|
||||
)
|
||||
output_chunk_count = max_buffer_size // 64 + 1
|
||||
cg.add_define("USB_UART_OUTPUT_CHUNK_COUNT", output_chunk_count)
|
||||
|
||||
for device in config:
|
||||
var = await register_usb_client(device)
|
||||
for index, channel in enumerate(device[CONF_CHANNELS]):
|
||||
chvar = cg.new_Pvariable(channel[CONF_ID], index, channel[CONF_BUFFER_SIZE])
|
||||
await cg.register_parented(chvar, var)
|
||||
cg.add(chvar.set_rx_buffer_size(channel[CONF_BUFFER_SIZE]))
|
||||
cg.add(chvar.set_stop_bits(channel[CONF_STOP_BITS]))
|
||||
cg.add(chvar.set_data_bits(channel[CONF_DATA_BITS]))
|
||||
cg.add(chvar.set_parity(channel[CONF_PARITY]))
|
||||
|
||||
@@ -132,8 +132,9 @@ class USBUartChannel : public uart::UARTComponent, public Parented<USBUartCompon
|
||||
friend class USBUartTypeCH34X;
|
||||
|
||||
public:
|
||||
// Number of output chunk slots per channel (8 × 64 bytes = 512 bytes peak, lazily allocated)
|
||||
static constexpr uint8_t USB_OUTPUT_CHUNK_COUNT = 8;
|
||||
// Number of output chunk slots per channel, derived from buffer_size config.
|
||||
// Computed as ceil(buffer_size / 64) + 1 in Python codegen; defaults to 5 (256 / 64 + 1).
|
||||
static constexpr uint8_t USB_OUTPUT_CHUNK_COUNT = USB_UART_OUTPUT_CHUNK_COUNT;
|
||||
|
||||
USBUartChannel(uint8_t index, uint16_t buffer_size) : index_(index), input_buffer_(RingBuffer(buffer_size)) {}
|
||||
void write_array(const uint8_t *data, size_t len) override;
|
||||
|
||||
@@ -1570,6 +1570,8 @@ void WiFiComponent::check_connecting_finished(uint32_t now) {
|
||||
#endif
|
||||
|
||||
this->state_ = WIFI_COMPONENT_STATE_STA_CONNECTED;
|
||||
// Refresh is_connected() cache; loop()'s refresh ran before this transition.
|
||||
this->update_connected_state_();
|
||||
this->num_retried_ = 0;
|
||||
this->print_connect_params_();
|
||||
|
||||
|
||||
@@ -948,6 +948,8 @@ void WiFiComponent::process_pending_callbacks_() {
|
||||
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
|
||||
if (this->pending_.disconnect) {
|
||||
this->pending_.disconnect = false;
|
||||
// Refresh is_connected() cache here, not in the SDK callback (sys context).
|
||||
this->update_connected_state_();
|
||||
this->notify_disconnect_state_listeners_();
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -179,7 +179,10 @@ void WiFiComponent::wifi_pre_setup_() {
|
||||
#endif // USE_WIFI_AP
|
||||
|
||||
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
|
||||
// cfg.nvs_enable = false;
|
||||
if (global_preferences->nvs_handle == 0) {
|
||||
ESP_LOGW(TAG, "starting wifi without nvs");
|
||||
cfg.nvs_enable = false;
|
||||
}
|
||||
err = esp_wifi_init(&cfg);
|
||||
if (err != ERR_OK) {
|
||||
ESP_LOGE(TAG, "esp_wifi_init failed: %s", esp_err_to_name(err));
|
||||
@@ -796,6 +799,8 @@ void WiFiComponent::wifi_process_event_(IDFWiFiEvent *data) {
|
||||
s_sta_connected = false;
|
||||
s_sta_connecting = false;
|
||||
error_from_callback_ = true;
|
||||
// Refresh is_connected() cache; error_from_callback_ makes it false.
|
||||
this->update_connected_state_();
|
||||
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
|
||||
this->notify_disconnect_state_listeners_();
|
||||
#endif
|
||||
|
||||
@@ -536,6 +536,8 @@ void WiFiComponent::wifi_process_event_(LTWiFiEvent *event) {
|
||||
this->error_from_callback_ = true;
|
||||
}
|
||||
|
||||
// Refresh is_connected() cache; sta_state_/error_from_callback_ make it false.
|
||||
this->update_connected_state_();
|
||||
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
|
||||
this->notify_disconnect_state_listeners_();
|
||||
#endif
|
||||
|
||||
@@ -342,6 +342,8 @@ void WiFiComponent::wifi_loop_() {
|
||||
s_sta_was_connected = false;
|
||||
s_sta_had_ip = false;
|
||||
ESP_LOGV(TAG, "Disconnected");
|
||||
// Refresh is_connected() cache; driver link status reports disconnected.
|
||||
this->update_connected_state_();
|
||||
#ifdef USE_WIFI_CONNECT_STATE_LISTENERS
|
||||
this->notify_disconnect_state_listeners_();
|
||||
#endif
|
||||
|
||||
@@ -4,7 +4,7 @@ from enum import Enum
|
||||
|
||||
from esphome.enum import StrEnum
|
||||
|
||||
__version__ = "2026.4.1"
|
||||
__version__ = "2026.4.4"
|
||||
|
||||
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
|
||||
VALID_SUBSTITUTIONS_CHARACTERS = (
|
||||
|
||||
@@ -284,6 +284,7 @@
|
||||
#define ESPHOME_WIFI_POWER_SAVE_LISTENERS 2
|
||||
#define USE_WIFI_RUNTIME_POWER_SAVE
|
||||
#define USB_HOST_MAX_REQUESTS 16
|
||||
#define USB_UART_OUTPUT_CHUNK_COUNT 5
|
||||
|
||||
#ifdef USE_ARDUINO
|
||||
#define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 3, 7)
|
||||
|
||||
@@ -606,33 +606,43 @@ def Pvariable(id_: ID, rhs: SafeExpType, type_: "MockObj" = None) -> "MockObj":
|
||||
if isinstance(rhs, MockObj) and rhs.is_new_expr:
|
||||
# For 'new' allocations, use placement new into static storage
|
||||
# to avoid heap fragmentation on embedded devices.
|
||||
the_type = id_.type
|
||||
#
|
||||
# Storage must be sized and aligned for the actual instantiated class,
|
||||
# which may be a subclass of id_.type (e.g. `cv.declare_id(BaseClass)`
|
||||
# combined with `SubClass.new()` — used by ili9xxx, waveshare_epaper,
|
||||
# etc. to select a model-specific constructor). Using id_.type would
|
||||
# run the base-class default constructor instead, silently losing any
|
||||
# subclass initialization. Template args live on the CallExpression
|
||||
# and are re-emitted below.
|
||||
call_expr = rhs.base
|
||||
assert isinstance(call_expr, CallExpression), (
|
||||
f"Expected CallExpression for placement new, got {type(call_expr)}"
|
||||
)
|
||||
actual_type = rhs.new_type if rhs.new_type is not None else id_.type
|
||||
if call_expr.template_args is not None:
|
||||
actual_type = f"{actual_type}{call_expr.template_args}"
|
||||
pointer_type = id_.type
|
||||
# Extract component namespace from type for memory analysis attribution
|
||||
component_ns = _extract_component_ns(str(the_type))
|
||||
component_ns = _extract_component_ns(str(actual_type))
|
||||
storage_name = f"{component_ns}__{id_.id}__pstorage"
|
||||
|
||||
# Declare aligned byte array for the object storage
|
||||
CORE.add_global(
|
||||
RawStatement(
|
||||
f"alignas({the_type}) static unsigned char {storage_name}[sizeof({the_type})];"
|
||||
f"alignas({actual_type}) static unsigned char {storage_name}[sizeof({actual_type})];"
|
||||
)
|
||||
)
|
||||
# Pointer declaration uses id_.type to preserve the declared base-class
|
||||
# pointer type for downstream callers (polymorphism through base ptr).
|
||||
CORE.add_global(
|
||||
AssignmentExpression(
|
||||
f"static {the_type}",
|
||||
f"static {pointer_type}",
|
||||
"*const ",
|
||||
id_,
|
||||
MockObj(f"reinterpret_cast<{the_type} *>({storage_name})"),
|
||||
MockObj(f"reinterpret_cast<{pointer_type} *>({storage_name})"),
|
||||
)
|
||||
)
|
||||
# Extract args from the CallExpression and rebuild as placement new.
|
||||
# Template args are already encoded in the_type (e.g. GlobalsComponent<int>),
|
||||
# so we only pass the constructor args, not template_args.
|
||||
call_expr = rhs.base
|
||||
assert isinstance(call_expr, CallExpression), (
|
||||
f"Expected CallExpression for placement new, got {type(call_expr)}"
|
||||
)
|
||||
placement_new = CallExpression(f"new({id_.id}) {the_type}", *call_expr.args)
|
||||
placement_new = CallExpression(f"new({id_.id}) {actual_type}", *call_expr.args)
|
||||
CORE.add(ExpressionStatement(placement_new))
|
||||
else:
|
||||
decl = VariableDeclarationExpression(id_.type, "*", id_, static=True)
|
||||
@@ -869,12 +879,16 @@ class MockObj(Expression):
|
||||
Mostly consists of magic methods that allow ESPHome's codegen syntax.
|
||||
"""
|
||||
|
||||
__slots__ = ("base", "op", "is_new_expr")
|
||||
__slots__ = ("base", "op", "is_new_expr", "new_type")
|
||||
|
||||
def __init__(self, base, op=".", is_new_expr=False) -> None:
|
||||
def __init__(self, base, op=".", is_new_expr=False, new_type=None) -> None:
|
||||
self.base = base
|
||||
self.op = op
|
||||
self.is_new_expr = is_new_expr
|
||||
# For `is_new_expr=True` objects, `new_type` holds the class name being
|
||||
# constructed (e.g. "ili9xxx::ILI9XXXST7789V"). Needed by Pvariable so
|
||||
# placement new uses the actual subclass rather than id_.type.
|
||||
self.new_type = new_type
|
||||
|
||||
def __getattr__(self, attr: str) -> "MockObj":
|
||||
# prevent python dunder methods being replaced by mock objects
|
||||
@@ -889,7 +903,9 @@ class MockObj(Expression):
|
||||
|
||||
def __call__(self, *args: SafeExpType) -> "MockObj":
|
||||
call = CallExpression(self.base, *args)
|
||||
return MockObj(call, self.op, is_new_expr=self.is_new_expr)
|
||||
return MockObj(
|
||||
call, self.op, is_new_expr=self.is_new_expr, new_type=self.new_type
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return str(self.base)
|
||||
@@ -903,7 +919,7 @@ class MockObj(Expression):
|
||||
|
||||
@property
|
||||
def new(self) -> "MockObj":
|
||||
return MockObj(f"new {self.base}", "->", is_new_expr=True)
|
||||
return MockObj(f"new {self.base}", "->", is_new_expr=True, new_type=self.base)
|
||||
|
||||
def template(self, *args: SafeExpType) -> "MockObj":
|
||||
"""Apply template parameters to this object."""
|
||||
|
||||
@@ -53,6 +53,37 @@ FILTER_PLATFORMIO_LINES = [
|
||||
]
|
||||
|
||||
|
||||
def _strip_win_long_path_prefix(path: str) -> str:
|
||||
r"""Strip the Windows extended-length path prefix from ``path``.
|
||||
|
||||
Handles both forms documented at
|
||||
https://learn.microsoft.com/windows/win32/fileio/naming-a-file:
|
||||
|
||||
* ``\\?\C:\path\to\file`` -> ``C:\path\to\file``
|
||||
* ``\\?\UNC\server\share\path`` -> ``\\server\share\path``
|
||||
|
||||
The NSIS-installed ``esphome.exe`` launcher on Windows starts Python with
|
||||
``sys.executable`` already prefixed with ``\\?\``. That prefix propagates
|
||||
into PlatformIO's ``$PYTHONEXE`` (PlatformIO reads ``PYTHONEXEPATH`` from
|
||||
the environment, falling back to ``os.path.normpath(sys.executable)``)
|
||||
and ends up baked into SCons-emitted command lines for build steps such
|
||||
as the esp8266 ``elf2bin`` invocation. ``cmd.exe`` does not understand
|
||||
the ``\\?\`` prefix, so the build fails with
|
||||
"The system cannot find the path specified." Stripping the prefix early
|
||||
keeps the path shell-quotable.
|
||||
|
||||
No-op on non-Windows platforms.
|
||||
"""
|
||||
if sys.platform != "win32":
|
||||
return path
|
||||
if path.startswith("\\\\?\\UNC\\"):
|
||||
# \\?\UNC\server\share\... -> \\server\share\...
|
||||
return "\\\\" + path[len("\\\\?\\UNC\\") :]
|
||||
if path.startswith("\\\\?\\"):
|
||||
return path[len("\\\\?\\") :]
|
||||
return path
|
||||
|
||||
|
||||
def run_platformio_cli(*args, **kwargs) -> str | int:
|
||||
os.environ["PLATFORMIO_FORCE_COLOR"] = "true"
|
||||
os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute())
|
||||
@@ -63,7 +94,18 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
|
||||
os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning")
|
||||
# Increase uv retry count to handle transient network errors (default is 3)
|
||||
os.environ.setdefault("UV_HTTP_RETRIES", "10")
|
||||
cmd = [sys.executable, "-m", "esphome.platformio_runner"] + list(args)
|
||||
# Strip the Windows extended-length path prefix from sys.executable so it
|
||||
# doesn't propagate into PlatformIO's $PYTHONEXE and break SCons-emitted
|
||||
# command lines run through cmd.exe.
|
||||
python_exe = _strip_win_long_path_prefix(sys.executable)
|
||||
if python_exe != sys.executable:
|
||||
# Only override PYTHONEXEPATH when we actually stripped a prefix.
|
||||
# PlatformIO's get_pythonexe_path() reads this and falls back to
|
||||
# sys.executable otherwise; setting it unconditionally would clobber
|
||||
# a user-provided value (or the unmodified path on platforms that
|
||||
# don't need the strip).
|
||||
os.environ["PYTHONEXEPATH"] = python_exe
|
||||
cmd = [python_exe, "-m", "esphome.platformio_runner"] + list(args)
|
||||
|
||||
return run_external_process(*cmd, **kwargs)
|
||||
|
||||
|
||||
@@ -94,13 +94,29 @@ def safe_print(message="", end="\n"):
|
||||
except UnicodeEncodeError:
|
||||
pass
|
||||
|
||||
# Fall back to the stream's actual encoding (e.g. cp1252 on Windows
|
||||
# redirected pipes). Use "backslashreplace" so unencodable code points
|
||||
# like the wifi signal-bar block characters (U+2582..U+2588) become
|
||||
# readable ``\uXXXX`` escapes, and decode back to ``str`` so ``print``
|
||||
# never receives a ``bytes`` object (which would render as a ``b'...'``
|
||||
# repr).
|
||||
encoding = sys.stdout.encoding or "ascii"
|
||||
try:
|
||||
print(message.encode("utf-8", "backslashreplace"), end=end)
|
||||
print(
|
||||
message.encode(encoding, "backslashreplace").decode(encoding),
|
||||
end=end,
|
||||
)
|
||||
return
|
||||
except UnicodeEncodeError:
|
||||
try:
|
||||
print(message.encode("ascii", "backslashreplace"), end=end)
|
||||
except UnicodeEncodeError:
|
||||
print("Cannot print line because of invalid locale!")
|
||||
pass
|
||||
|
||||
try:
|
||||
print(
|
||||
message.encode("ascii", "backslashreplace").decode("ascii"),
|
||||
end=end,
|
||||
)
|
||||
except UnicodeEncodeError:
|
||||
print("Cannot print line because of invalid locale!")
|
||||
|
||||
|
||||
def safe_input(prompt=""):
|
||||
|
||||
0
tests/component_tests/ili9xxx/__init__.py
Normal file
0
tests/component_tests/ili9xxx/__init__.py
Normal file
20
tests/component_tests/ili9xxx/config/ili9xxx_test.yaml
Normal file
20
tests/component_tests/ili9xxx/config/ili9xxx_test.yaml
Normal file
@@ -0,0 +1,20 @@
|
||||
esphome:
|
||||
name: test
|
||||
|
||||
esp32:
|
||||
board: esp32dev
|
||||
framework:
|
||||
type: arduino
|
||||
|
||||
spi:
|
||||
clk_pin: GPIO18
|
||||
mosi_pin: GPIO23
|
||||
|
||||
display:
|
||||
- platform: ili9xxx
|
||||
id: tft_display
|
||||
model: ST7789V
|
||||
cs_pin: GPIO5
|
||||
dc_pin: GPIO17
|
||||
reset_pin: GPIO16
|
||||
invert_colors: false
|
||||
31
tests/component_tests/ili9xxx/test_ili9xxx.py
Normal file
31
tests/component_tests/ili9xxx/test_ili9xxx.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""Tests for the ili9xxx component."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def test_ili9xxx_placement_new_uses_model_subclass(
|
||||
generate_main: Callable[[str | Path], str],
|
||||
component_config_path: Callable[[str], Path],
|
||||
) -> None:
|
||||
"""Regression test for ili9xxx picking the right constructor under placement new.
|
||||
|
||||
ili9xxx declares the ID as the base ``ILI9XXXDisplay`` but constructs a
|
||||
model-specific subclass (e.g. ``ILI9XXXST7789V``) via ``MODELS[...].new()``.
|
||||
Pvariable must emit placement new for the subclass — otherwise the base
|
||||
default constructor runs and the panel is left with a null init sequence
|
||||
and 0x0 dimensions, producing a silent blank screen.
|
||||
"""
|
||||
main_cpp = generate_main(component_config_path("ili9xxx_test.yaml"))
|
||||
|
||||
# Storage is sized for the subclass so the full object fits.
|
||||
assert "sizeof(ili9xxx::ILI9XXXST7789V)" in main_cpp
|
||||
assert "alignas(ili9xxx::ILI9XXXST7789V)" in main_cpp
|
||||
# Pointer is declared as the base type for polymorphism.
|
||||
assert "static ili9xxx::ILI9XXXDisplay *const tft_display" in main_cpp
|
||||
# Placement new runs the subclass constructor — this is the actual regression fix.
|
||||
assert "new(tft_display) ili9xxx::ILI9XXXST7789V()" in main_cpp
|
||||
# Base-class default constructor must NOT be used.
|
||||
assert "new(tft_display) ili9xxx::ILI9XXXDisplay()" not in main_cpp
|
||||
@@ -7,10 +7,12 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from PIL import Image as PILImage
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv
|
||||
from esphome.components.image import (
|
||||
CONF_ALPHA_CHANNEL,
|
||||
CONF_INVERT_ALPHA,
|
||||
CONF_OPAQUE,
|
||||
CONF_TRANSPARENCY,
|
||||
@@ -411,3 +413,70 @@ async def test_svg_with_mm_dimensions_succeeds(
|
||||
assert 30 < height < 50, (
|
||||
f"Height should be around 39 pixels for 10mm at 100dpi, got {height}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rgb565_alpha_animation_layout_per_frame(
|
||||
tmp_path: Path,
|
||||
mock_progmem_array: MagicMock,
|
||||
) -> None:
|
||||
"""RGB565+alpha animations must store each frame as a self-contained
|
||||
[RGB plane | alpha plane] block. Animation::update_data_start_ steps frames
|
||||
with a single per-frame stride, so any cross-frame layout (all RGB then all
|
||||
alpha) makes the C++ alpha read land in the next frame's RGB bytes — that
|
||||
was the regression behind issue #15999.
|
||||
"""
|
||||
# Build a 2-frame APNG where each frame is a solid color with a known
|
||||
# alpha. APNG preserves full RGBA per pixel (GIF only has 1-bit alpha so
|
||||
# round-tripping mid-range alpha values does not work). Frame 0 is fully
|
||||
# opaque red, frame 1 is fully transparent blue.
|
||||
width = 4
|
||||
height = 3
|
||||
frame0 = PILImage.new("RGBA", (width, height), (255, 0, 0, 0xFF))
|
||||
frame1 = PILImage.new("RGBA", (width, height), (0, 0, 255, 0x00))
|
||||
apng_path = tmp_path / "anim.png"
|
||||
frame0.save(
|
||||
apng_path,
|
||||
format="PNG",
|
||||
save_all=True,
|
||||
append_images=[frame1],
|
||||
duration=100,
|
||||
loop=0,
|
||||
)
|
||||
|
||||
config = {
|
||||
CONF_FILE: str(apng_path),
|
||||
CONF_TYPE: "RGB565",
|
||||
CONF_TRANSPARENCY: CONF_ALPHA_CHANNEL,
|
||||
CONF_DITHER: "NONE",
|
||||
CONF_INVERT_ALPHA: False,
|
||||
CONF_RAW_DATA_ID: "test_raw_data_id",
|
||||
}
|
||||
|
||||
_, _, _, _, _, frame_count = await write_image(config, all_frames=True)
|
||||
assert frame_count == 2
|
||||
|
||||
# Recover the bytes handed to progmem_array. Signature is (id_, rhs).
|
||||
_, raw_data = mock_progmem_array.call_args.args
|
||||
data = [int(x) for x in raw_data]
|
||||
|
||||
rgb_size = width * height * 2
|
||||
alpha_size = width * height
|
||||
frame_size = rgb_size + alpha_size
|
||||
assert len(data) == frame_size * frame_count, (
|
||||
"RGB565+alpha animation buffer must be (RGB + alpha) per frame, not "
|
||||
"all RGB followed by all alpha"
|
||||
)
|
||||
|
||||
# Frame 0: RGB plane is red, alpha plane is 0xFF. Frame 1: alpha plane is
|
||||
# 0x00. If the layout regresses to [all RGB | all alpha], the alpha bytes
|
||||
# would all land at the tail of the buffer and the per-frame slices below
|
||||
# would point at RGB565 noise instead.
|
||||
frame0_alpha = data[rgb_size : rgb_size + alpha_size]
|
||||
frame1_alpha = data[frame_size + rgb_size : frame_size + rgb_size + alpha_size]
|
||||
assert all(a == 0xFF for a in frame0_alpha), (
|
||||
f"Frame 0 alpha plane should be opaque, got {frame0_alpha}"
|
||||
)
|
||||
assert all(a == 0x00 for a in frame1_alpha), (
|
||||
f"Frame 1 alpha plane should be transparent, got {frame1_alpha}"
|
||||
)
|
||||
|
||||
@@ -91,6 +91,24 @@ api:
|
||||
- float_arr.size()
|
||||
- string_arr[0].c_str()
|
||||
- string_arr.size()
|
||||
# Test array + string args used after a non-synchronous action (delay).
|
||||
# The default non-owning types (StringRef, const FixedVector&) would
|
||||
# dangle once rx_buf_ is reused, and FixedVector is non-copyable so
|
||||
# DelayAction's lambda capture would fail to compile. The api codegen
|
||||
# must fall back to owning std::string / std::vector here.
|
||||
- action: array_with_delay
|
||||
variables:
|
||||
name: string
|
||||
int_arr: int[]
|
||||
string_arr: string[]
|
||||
then:
|
||||
- delay: 20ms
|
||||
- logger.log:
|
||||
format: "Delayed: %s (%u ints, %u strings)"
|
||||
args:
|
||||
- name.c_str()
|
||||
- int_arr.size()
|
||||
- string_arr.size()
|
||||
# Test ContinuationAction (IfAction with then/else branches)
|
||||
- action: test_if_action
|
||||
variables:
|
||||
|
||||
@@ -4,3 +4,9 @@ esphome:
|
||||
- deep_sleep.prevent
|
||||
- delay: 1s
|
||||
- deep_sleep.allow
|
||||
- if:
|
||||
condition:
|
||||
lambda: 'return false;'
|
||||
then:
|
||||
- deep_sleep.enter:
|
||||
sleep_duration: 60min
|
||||
|
||||
7
tests/components/esp32/dummy_signing_key_v1_ecdsa.pem
Normal file
7
tests/components/esp32/dummy_signing_key_v1_ecdsa.pem
Normal file
@@ -0,0 +1,7 @@
|
||||
*** DO NOT USE THIS KEY...EVER ***
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIEZIp96p7Z7QN6vxOFE5FdRNm535vW81Ax07KnGxVjiMoAoGCCqGSM49
|
||||
AwEHoUQDQgAEK+fBQDn1Q+r5lGwcDoMUgeg2Aq16LLrLUz7xWI6mS0PUClzolDIo
|
||||
eaV/Pfjl7zAvkbQQsZq3rTNnr1eGAk5P+A==
|
||||
-----END EC PRIVATE KEY-----
|
||||
*** DO NOT USE THIS KEY...EVER ***
|
||||
10
tests/components/esp32/test-signed_ota_v1.esp32-idf.yaml
Normal file
10
tests/components/esp32/test-signed_ota_v1.esp32-idf.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
esp32:
|
||||
variant: esp32
|
||||
framework:
|
||||
type: esp-idf
|
||||
advanced:
|
||||
signed_ota_verification:
|
||||
signing_key: ../../components/esp32/dummy_signing_key_v1_ecdsa.pem
|
||||
signing_scheme: ecdsa_v1
|
||||
|
||||
<<: !include common.yaml
|
||||
@@ -1,6 +1,11 @@
|
||||
"""Tests for time component cron expression parsing."""
|
||||
|
||||
from esphome.components.time import _parse_cron_part
|
||||
import errno
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome.components.time import _load_tzdata, _parse_cron_part, validate_tz
|
||||
|
||||
|
||||
def test_star_slash_seconds() -> None:
|
||||
@@ -78,3 +83,63 @@ def test_range() -> None:
|
||||
|
||||
def test_single_value() -> None:
|
||||
assert _parse_cron_part("30", 0, 59, {}) == {30}
|
||||
|
||||
|
||||
def _mock_resources_with_error(error: Exception) -> MagicMock:
|
||||
"""Return a mock of importlib.resources.files where read_bytes raises error."""
|
||||
leaf = MagicMock()
|
||||
leaf.read_bytes.side_effect = error
|
||||
package = MagicMock()
|
||||
package.__truediv__.return_value = leaf
|
||||
return MagicMock(return_value=package)
|
||||
|
||||
|
||||
def test_load_tzdata_returns_none_on_windows_einval() -> None:
|
||||
"""On Windows, opening a tzdata path with NTFS-illegal chars raises OSError(EINVAL).
|
||||
|
||||
Regression test for crash when the system TZ resolves to a POSIX string like
|
||||
"<+08>-8" (Asia/Shanghai, IST, etc.) and is fed back into _load_tzdata by
|
||||
validate_tz to check whether it is also a valid IANA key.
|
||||
"""
|
||||
err = OSError(errno.EINVAL, "Invalid argument")
|
||||
with patch(
|
||||
"esphome.components.time.resources.files",
|
||||
_mock_resources_with_error(err),
|
||||
):
|
||||
assert _load_tzdata("<+08>-8") is None
|
||||
|
||||
|
||||
def test_load_tzdata_propagates_unexpected_oserror() -> None:
|
||||
"""Unrelated OSErrors (e.g. PermissionError) must not be swallowed."""
|
||||
with (
|
||||
patch(
|
||||
"esphome.components.time.resources.files",
|
||||
_mock_resources_with_error(
|
||||
PermissionError(errno.EACCES, "Permission denied")
|
||||
),
|
||||
),
|
||||
pytest.raises(PermissionError),
|
||||
):
|
||||
_load_tzdata("Some/Zone")
|
||||
|
||||
|
||||
def test_load_tzdata_returns_none_on_file_not_found() -> None:
|
||||
"""Existing behavior: missing tz file returns None rather than raising."""
|
||||
with patch(
|
||||
"esphome.components.time.resources.files",
|
||||
_mock_resources_with_error(FileNotFoundError()),
|
||||
):
|
||||
assert _load_tzdata("Not/A/Zone") is None
|
||||
|
||||
|
||||
def test_validate_tz_accepts_posix_string_when_read_bytes_raises_einval() -> None:
|
||||
"""validate_tz must not crash when _load_tzdata hits the Windows EINVAL path.
|
||||
|
||||
Simulates the Windows case where the auto-detected POSIX TZ string is fed
|
||||
back through _load_tzdata and the underlying read_bytes raises errno 22.
|
||||
"""
|
||||
with patch(
|
||||
"esphome.components.time.resources.files",
|
||||
_mock_resources_with_error(OSError(errno.EINVAL, "Invalid argument")),
|
||||
):
|
||||
assert validate_tz("<+08>-8") == "<+08>-8"
|
||||
|
||||
@@ -311,6 +311,105 @@ def test_run_platformio_cli_sets_environment_variables(
|
||||
assert "arg" in args
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("platform", "input_path", "expected"),
|
||||
[
|
||||
# win32: drive-letter extended-length prefix is stripped
|
||||
(
|
||||
"win32",
|
||||
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
|
||||
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
|
||||
),
|
||||
# win32: UNC extended-length prefix is translated to a regular UNC path
|
||||
(
|
||||
"win32",
|
||||
"\\\\?\\UNC\\server\\share\\python.exe",
|
||||
"\\\\server\\share\\python.exe",
|
||||
),
|
||||
# win32: paths without the prefix are returned unchanged
|
||||
(
|
||||
"win32",
|
||||
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
|
||||
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
|
||||
),
|
||||
# non-win32: prefix is left alone (no-op)
|
||||
("linux", "\\\\?\\C:\\python.exe", "\\\\?\\C:\\python.exe"),
|
||||
("darwin", "/usr/bin/python3", "/usr/bin/python3"),
|
||||
],
|
||||
)
|
||||
def test_strip_win_long_path_prefix(
|
||||
platform: str, input_path: str, expected: str
|
||||
) -> None:
|
||||
r"""``\\?\`` and ``\\?\UNC\`` prefixes are stripped only on win32."""
|
||||
with patch("esphome.platformio_api.sys.platform", platform):
|
||||
assert platformio_api._strip_win_long_path_prefix(input_path) == expected
|
||||
|
||||
|
||||
def test_run_platformio_cli_strips_win_long_path_prefix(
|
||||
setup_core: Path, mock_run_external_process: Mock
|
||||
) -> None:
|
||||
r"""Windows ``\\?\`` prefix on sys.executable does not leak into the subprocess.
|
||||
|
||||
The NSIS-installed esphome.exe launcher starts Python with
|
||||
``sys.executable`` already prefixed by the extended-length path marker.
|
||||
That prefix would otherwise propagate into PlatformIO's ``PYTHONEXE`` and
|
||||
break SCons-emitted command lines run through ``cmd.exe``.
|
||||
"""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
prefixed_exe = (
|
||||
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
|
||||
)
|
||||
stripped_exe = (
|
||||
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
|
||||
)
|
||||
|
||||
with (
|
||||
patch.dict(os.environ, {}, clear=False),
|
||||
patch("esphome.platformio_api.sys.platform", "win32"),
|
||||
patch("esphome.platformio_api.sys.executable", prefixed_exe),
|
||||
):
|
||||
# Pop any pre-existing PYTHONEXEPATH so the assertion below reflects
|
||||
# what run_platformio_cli set, not whatever the test runner's
|
||||
# environment happened to contain.
|
||||
os.environ.pop("PYTHONEXEPATH", None)
|
||||
mock_run_external_process.return_value = 0
|
||||
platformio_api.run_platformio_cli("test", "arg")
|
||||
|
||||
# The subprocess is invoked with the stripped executable path.
|
||||
mock_run_external_process.assert_called_once()
|
||||
args = mock_run_external_process.call_args[0]
|
||||
assert args[0] == stripped_exe
|
||||
# PYTHONEXEPATH is exported with the stripped path so PlatformIO's
|
||||
# get_pythonexe_path() picks it up in the subprocess.
|
||||
assert os.environ["PYTHONEXEPATH"] == stripped_exe
|
||||
|
||||
|
||||
def test_run_platformio_cli_does_not_set_pythonexepath_without_strip(
|
||||
setup_core: Path, mock_run_external_process: Mock
|
||||
) -> None:
|
||||
r"""PYTHONEXEPATH is not touched when sys.executable has no ``\\?\`` prefix.
|
||||
|
||||
Setting it unconditionally would clobber a user-provided value (or
|
||||
interfere with non-Windows tooling that has no prefix to strip).
|
||||
"""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
plain_exe = "/usr/bin/python3"
|
||||
|
||||
with (
|
||||
patch.dict(os.environ, {}, clear=False),
|
||||
patch("esphome.platformio_api.sys.platform", "linux"),
|
||||
patch("esphome.platformio_api.sys.executable", plain_exe),
|
||||
):
|
||||
os.environ.pop("PYTHONEXEPATH", None)
|
||||
mock_run_external_process.return_value = 0
|
||||
platformio_api.run_platformio_cli("test", "arg")
|
||||
|
||||
mock_run_external_process.assert_called_once()
|
||||
args = mock_run_external_process.call_args[0]
|
||||
assert args[0] == plain_exe
|
||||
assert "PYTHONEXEPATH" not in os.environ
|
||||
|
||||
|
||||
def test_run_platformio_cli_run_builds_command(
|
||||
setup_core: Path, mock_run_platformio_cli: Mock
|
||||
) -> None:
|
||||
|
||||
@@ -709,3 +709,119 @@ def test_detect_rp2040_bootsel_timeout() -> None:
|
||||
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
|
||||
assert result.device_count == 0
|
||||
assert result.permission_error is False
|
||||
|
||||
|
||||
class TestSafePrint:
|
||||
"""Tests for ``safe_print`` and its UnicodeEncodeError fallback chain."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _no_dashboard(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Default ``CORE.dashboard`` to False so each test starts hermetic."""
|
||||
from esphome.core import CORE
|
||||
|
||||
monkeypatch.setattr(CORE, "dashboard", False)
|
||||
|
||||
def test_prints_plain_message(self, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
"""ASCII-only messages take the fast path through native ``print``."""
|
||||
util.safe_print("hello world")
|
||||
assert capsys.readouterr().out == "hello world\n"
|
||||
|
||||
def test_prints_unicode_on_utf8_stdout(
|
||||
self, capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
"""Non-ASCII goes straight through when stdout can encode it."""
|
||||
util.safe_print("bars: \u2582\u2584\u2586\u2588")
|
||||
assert capsys.readouterr().out == "bars: \u2582\u2584\u2586\u2588\n"
|
||||
|
||||
def test_dashboard_escapes_esc_byte(
|
||||
self,
|
||||
capsys: pytest.CaptureFixture[str],
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
r"""Dashboard mode escapes raw ``\033`` ESC bytes to literal ``\\033``."""
|
||||
from esphome.core import CORE
|
||||
|
||||
monkeypatch.setattr(CORE, "dashboard", True)
|
||||
util.safe_print("\033[0;32mhi\033[0m")
|
||||
assert capsys.readouterr().out == "\\033[0;32mhi\\033[0m\n"
|
||||
|
||||
def test_fallback_writes_string_not_bytes_repr(
|
||||
self, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Regression: cp1252 fallback must produce a printable str, not ``b'...'``.
|
||||
|
||||
On Windows, when stdout is a redirected pipe (e.g. the dashboard),
|
||||
Python uses cp1252, which cannot encode the wifi signal-bar block
|
||||
characters (U+2582..U+2588). The previous fallback path called
|
||||
``print(message.encode(...))`` with a ``bytes`` object, which
|
||||
Python's ``print`` rendered as a literal ``b'...'`` repr — visible
|
||||
in the user's dashboard output. The fix re-encodes through the
|
||||
stream's encoding with ``backslashreplace`` and decodes back to
|
||||
``str``.
|
||||
"""
|
||||
buf = io.BytesIO()
|
||||
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
|
||||
monkeypatch.setattr(sys, "stdout", cp1252_stream)
|
||||
|
||||
util.safe_print("bars: \u2582\u2584\u2586\u2588 done")
|
||||
cp1252_stream.flush()
|
||||
output = buf.getvalue().decode("cp1252")
|
||||
|
||||
# Output is a clean line, not the bytes repr.
|
||||
assert not output.startswith("b'")
|
||||
assert "b'bars" not in output
|
||||
# Unencodable codepoints become readable backslash escapes.
|
||||
assert "\\u2582\\u2584\\u2586\\u2588" in output
|
||||
# Encodable parts survive unchanged.
|
||||
assert "bars: " in output
|
||||
assert " done" in output
|
||||
assert output.endswith("\n")
|
||||
|
||||
def test_fallback_with_dashboard_escaped_message(
|
||||
self, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Dashboard ESC escaping + cp1252 fallback compose correctly."""
|
||||
from esphome.core import CORE
|
||||
|
||||
monkeypatch.setattr(CORE, "dashboard", True)
|
||||
buf = io.BytesIO()
|
||||
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
|
||||
monkeypatch.setattr(sys, "stdout", cp1252_stream)
|
||||
|
||||
util.safe_print("\033[0;32m\u2582\u2584\u2586\u2588\033[0m")
|
||||
cp1252_stream.flush()
|
||||
output = buf.getvalue().decode("cp1252")
|
||||
|
||||
# Dashboard escaping turned ESC into literal "\033" (5 chars), which
|
||||
# cp1252 can encode, so it survives the round-trip verbatim.
|
||||
assert "\\033[0;32m" in output
|
||||
assert "\\033[0m" in output
|
||||
# Block characters became backslash escapes via backslashreplace.
|
||||
assert "\\u2582\\u2584\\u2586\\u2588" in output
|
||||
|
||||
def test_final_message_when_locale_is_invalid(
|
||||
self,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
capsys: pytest.CaptureFixture[str],
|
||||
) -> None:
|
||||
"""If every encoding path fails, surface the locale-error sentinel."""
|
||||
original_print = print
|
||||
call_count = 0
|
||||
|
||||
def fake_print(*args: Any, **kwargs: Any) -> None:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
# The first three calls are: native print, stream-encoding
|
||||
# fallback, ASCII fallback. Make all three raise so we reach
|
||||
# the final sentinel "Cannot print line..." which is expected
|
||||
# to succeed (no encoding required).
|
||||
if call_count <= 3:
|
||||
raise UnicodeEncodeError("ascii", "x", 0, 1, "boom")
|
||||
original_print(*args, **kwargs)
|
||||
|
||||
monkeypatch.setattr("builtins.print", fake_print)
|
||||
util.safe_print("x")
|
||||
assert call_count == 4
|
||||
assert (
|
||||
capsys.readouterr().out == "Cannot print line because of invalid locale!\n"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user