Merge pull request #16240 from esphome/bump-2026.4.4

2026.4.4
This commit is contained in:
Jesse Hills
2026-05-05 14:21:38 +12:00
committed by GitHub
15 changed files with 404 additions and 50 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2026.4.3
PROJECT_NUMBER = 2026.4.4
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -598,7 +598,7 @@ async def component_resume_action_to_code(
comp = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, comp)
if CONF_UPDATE_INTERVAL in config:
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int)
template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, cg.uint32)
cg.add(var.set_update_interval(template_))
return var

View File

@@ -72,17 +72,35 @@ APIUnregisterServiceCallAction = api_ns.class_(
UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger)
ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument")
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Owning element type for each YAML service variable type. Used to derive both
# the zero-copy native types and the owning fallback types below.
_SERVICE_ARG_SCALAR_TYPES: dict[str, MockObj] = {
"bool": cg.bool_,
"int": cg.int32,
"float": cg.float_,
"string": cg.std_string,
}
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
# Scalars are passed by value; string uses a non-owning view into rx_buf_.
**_SERVICE_ARG_SCALAR_TYPES,
"string": cg.StringRef,
"bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"),
"int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"),
"float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"),
"string[]": cg.FixedVector.template(cg.std_string)
.operator("const")
.operator("ref"),
# Arrays are passed as non-owning const references into rx_buf_.
**{
f"{name}[]": cg.FixedVector.template(t).operator("const").operator("ref")
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
# Owning fallback types used when the action chain contains non-synchronous actions
# (delay, wait_until, script.wait, etc.). The default non-owning types reference
# storage in the receive buffer, which is reused once the synchronous portion of
# the chain returns. FixedVector is also non-copyable, so the deferred lambda
# capture in DelayAction::play_complex would fail to compile.
SERVICE_ARG_FALLBACK_TYPES: dict[str, MockObj] = {
"string": cg.std_string,
**{
f"{name}[]": cg.std_vector.template(t)
for name, t in _SERVICE_ARG_SCALAR_TYPES.items()
},
}
CONF_ENCRYPTION = "encryption"
CONF_BATCH_DELAY = "batch_delay"
@@ -382,17 +400,20 @@ async def to_code(config: ConfigType) -> None:
func_args.append((cg.bool_, "return_response"))
# Check if action chain has non-synchronous actions that would make
# non-owning StringRef dangle (rx_buf_ reused after delay)
# non-owning args (StringRef, const FixedVector&) dangle once the
# rx_buf_ is reused after a delay/wait_until/script.wait/etc. The
# FixedVector references would also fail to compile because they
# are non-copyable and DelayAction captures args by value.
has_non_synchronous = automation.has_non_synchronous_actions(
conf.get(CONF_THEN, [])
)
service_arg_names: list[str] = []
for name, var_ in conf[CONF_VARIABLES].items():
native = SERVICE_ARG_NATIVE_TYPES[var_]
# Fall back to std::string for string args if non-synchronous actions exist
if has_non_synchronous and native is cg.StringRef:
native = cg.std_string
if has_non_synchronous and var_ in SERVICE_ARG_FALLBACK_TYPES:
native = SERVICE_ARG_FALLBACK_TYPES[var_]
else:
native = SERVICE_ARG_NATIVE_TYPES[var_]
service_template_args.append(native)
func_args.append((native, name))
service_arg_names.append(name)

View File

@@ -20,6 +20,7 @@ import contextlib
from esphome.const import CONF_KEY, CONF_PORT, __version__
from esphome.core import CORE
from esphome.platformio_api import process_stacktrace
from esphome.util import safe_print
from . import CONF_ENCRYPTION
@@ -60,7 +61,6 @@ async def async_run_logs(
noise_psk=noise_psk,
addresses=addresses, # Pass all addresses for automatic retry
)
dashboard = CORE.dashboard
backtrace_state = False
# Try platform-specific stacktrace handler first, fall back to generic
@@ -82,7 +82,11 @@ async def async_run_logs(
f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
)
for parsed_msg in parse_log_message(text, timestamp):
print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg)
# safe_print handles the dashboard \033 escaping and falls back
# to backslashreplace encoding on stdouts that can't represent
# the wifi signal-bar block characters (Windows redirected
# cp1252 pipe).
safe_print(parsed_msg)
for raw_line in text.splitlines():
if platform_process_stacktrace:
backtrace_state = platform_process_stacktrace(

View File

@@ -1740,7 +1740,17 @@ async def to_code(config):
# Wrap FILE*-based printf functions to eliminate newlib's _vfprintf_r
# (~11 KB). See printf_stubs.cpp for implementation.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF]:
#
# The wrap is only beneficial against newlib. Picolibc's tinystdio
# implements vsnprintf by building a string-output FILE and calling
# vfprintf, so vfprintf is unconditionally linked in by any caller
# of snprintf/vsnprintf — effectively every build — and the wrap
# saves nothing while costing ~170 B of shim. IDF 5.x defaults to
# newlib on every variant; IDF 6.0+ switches to picolibc on every
# variant.
if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF] or idf_version() >= cv.Version(
6, 0, 0
):
cg.add_define("USE_FULL_PRINTF")
else:
for symbol in ("vprintf", "printf", "fprintf", "vfprintf"):

View File

@@ -1,32 +1,38 @@
/*
* Linker wrap stubs for FILE*-based printf functions.
* Linker wrap stubs for FILE*-based printf functions (newlib only).
*
* ESP-IDF SDK components (gpio driver, ringbuf, log_write) reference
* fprintf(), printf(), vprintf(), and vfprintf() which pull in the full
* printf implementation (~11 KB on newlib's _vfprintf_r, ~2.8 KB on
* picolibc's vfprintf). This is a separate implementation from the one
* used by snprintf/vsnprintf that handles FILE* stream I/O with buffering
* and locking.
* fprintf(), printf(), vprintf(), and vfprintf(), which on newlib pull
* in _vfprintf_r (~11 KB) — a separate implementation from the one used
* by snprintf/vsnprintf that handles FILE* stream I/O with buffering.
*
* ESPHome replaces the ESP-IDF log handler via esp_log_set_vprintf_(),
* so the SDK's vprintf() path is dead code at runtime. The fprintf()
* and printf() calls in SDK components are only in debug/assert paths
* (gpio_dump_io_configuration, ringbuf diagnostics) that are either
* GC'd or never called. Crash backtraces and panic output are
* unaffected they use esp_rom_printf() which is a ROM function
* and does not go through libc.
* unaffected; they use esp_rom_printf() which is a ROM function and
* does not go through libc.
*
* These stubs redirect through vsnprintf() (which uses _svfprintf_r
* already in the binary) and fwrite(), allowing the linker to
* dead-code eliminate _vfprintf_r.
* This wrap is newlib-only. On picolibc, vsnprintf is implemented as
* vfprintf into a string-output FILE, so vfprintf is unconditionally
* linked in by any caller of snprintf/vsnprintf and the wrap can never
* elide it — it just adds shim cost. Codegen forces USE_FULL_PRINTF
* on picolibc builds (IDF 6.0+ on all variants) so this file compiles
* to nothing there; the #error below catches a desynchronised gate.
*
* Saves ~11 KB of flash.
* Saves ~11 KB of flash on newlib.
*
* To disable these wraps, set enable_full_printf: true in the esp32
* advanced config section.
* To disable this wrap on newlib, set enable_full_printf: true in the
* esp32 advanced config section.
*/
#if defined(USE_ESP_IDF) && !defined(USE_FULL_PRINTF)
#ifdef __PICOLIBC__
#error "printf wrap is net-negative on picolibc; codegen should set USE_FULL_PRINTF"
#endif
#include <cstdarg>
#include <cstdio>
@@ -34,6 +40,9 @@
namespace esphome::esp32 {}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
static constexpr size_t PRINTF_BUFFER_SIZE = 512;
// These stubs are essentially dead code at runtime — ESPHome replaces the
@@ -55,14 +64,16 @@ static int write_printf_buffer(FILE *stream, char *buf, int len) {
return len;
}
// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming)
extern "C" {
int __wrap_vprintf(const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stdout, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_printf(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
@@ -71,11 +82,6 @@ int __wrap_printf(const char *fmt, ...) {
return len;
}
int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) {
char buf[PRINTF_BUFFER_SIZE];
return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap));
}
int __wrap_fprintf(FILE *stream, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);

View File

@@ -454,10 +454,12 @@ void LVTouchListener::update(const touchscreen::TouchPoints_t &tpoints) {
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value) {
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value) {
auto *scale = lv_obj_get_parent(obj);
auto min_value = lv_scale_get_range_min_value(scale);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (lv_scale_get_range_max_value(scale) - min_value) +
auto max_value = lv_scale_get_range_max_value(scale);
value = clamp(value, min_value, max_value);
return ((value - min_value) * lv_scale_get_angle_range(scale) / (max_value - min_value) +
lv_scale_get_rotation((scale))) %
360;
}

View File

@@ -112,7 +112,7 @@ inline void lv_animimg_set_src(lv_obj_t *img, std::vector<image::Image *> images
#endif // USE_LVGL_ANIMIMG
#ifdef USE_LVGL_METER
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value);
int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value);
#endif
// Parent class for things that wrap an LVGL object

View File

@@ -2,6 +2,7 @@ from esphome import pins
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
CONF_ALLOW_OTHER_USES,
CONF_ID,
CONF_INPUT,
CONF_INTERRUPT,
@@ -30,10 +31,29 @@ MCP23XXX_INTERRUPT_MODES = {
"FALLING": MCP23XXXInterruptMode.MCP23XXX_FALLING,
}
def _validate_interrupt_pin(value):
# The MCP component owns INT polarity (active-low, hardcoded falling-edge ISR)
# and installs a single ISR per GPIO, so neither inversion nor sharing is supported.
value = pins.internal_gpio_input_pin_schema(value)
if value.get(CONF_INVERTED):
raise cv.Invalid(
f"'{CONF_INVERTED}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"the MCP23xxx INT line is fixed active-low"
)
if value.get(CONF_ALLOW_OTHER_USES):
raise cv.Invalid(
f"'{CONF_ALLOW_OTHER_USES}: true' is not supported on '{CONF_INTERRUPT_PIN}'; "
"sharing the interrupt pin between multiple MCP23xxx (or other components) "
"is not implemented. Remove the interrupt_pin to fall back to polling."
)
return value
MCP23XXX_CONFIG_SCHEMA = cv.Schema(
{
cv.Optional(CONF_OPEN_DRAIN_INTERRUPT, default=False): cv.boolean,
cv.Optional(CONF_INTERRUPT_PIN): pins.internal_gpio_input_pin_schema,
cv.Optional(CONF_INTERRUPT_PIN): _validate_interrupt_pin,
}
).extend(cv.COMPONENT_SCHEMA)

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2026.4.3"
__version__ = "2026.4.4"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -53,6 +53,37 @@ FILTER_PLATFORMIO_LINES = [
]
def _strip_win_long_path_prefix(path: str) -> str:
r"""Strip the Windows extended-length path prefix from ``path``.
Handles both forms documented at
https://learn.microsoft.com/windows/win32/fileio/naming-a-file:
* ``\\?\C:\path\to\file`` -> ``C:\path\to\file``
* ``\\?\UNC\server\share\path`` -> ``\\server\share\path``
The NSIS-installed ``esphome.exe`` launcher on Windows starts Python with
``sys.executable`` already prefixed with ``\\?\``. That prefix propagates
into PlatformIO's ``$PYTHONEXE`` (PlatformIO reads ``PYTHONEXEPATH`` from
the environment, falling back to ``os.path.normpath(sys.executable)``)
and ends up baked into SCons-emitted command lines for build steps such
as the esp8266 ``elf2bin`` invocation. ``cmd.exe`` does not understand
the ``\\?\`` prefix, so the build fails with
"The system cannot find the path specified." Stripping the prefix early
keeps the path shell-quotable.
No-op on non-Windows platforms.
"""
if sys.platform != "win32":
return path
if path.startswith("\\\\?\\UNC\\"):
# \\?\UNC\server\share\... -> \\server\share\...
return "\\\\" + path[len("\\\\?\\UNC\\") :]
if path.startswith("\\\\?\\"):
return path[len("\\\\?\\") :]
return path
def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ["PLATFORMIO_FORCE_COLOR"] = "true"
os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute())
@@ -63,7 +94,18 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning")
# Increase uv retry count to handle transient network errors (default is 3)
os.environ.setdefault("UV_HTTP_RETRIES", "10")
cmd = [sys.executable, "-m", "esphome.platformio_runner"] + list(args)
# Strip the Windows extended-length path prefix from sys.executable so it
# doesn't propagate into PlatformIO's $PYTHONEXE and break SCons-emitted
# command lines run through cmd.exe.
python_exe = _strip_win_long_path_prefix(sys.executable)
if python_exe != sys.executable:
# Only override PYTHONEXEPATH when we actually stripped a prefix.
# PlatformIO's get_pythonexe_path() reads this and falls back to
# sys.executable otherwise; setting it unconditionally would clobber
# a user-provided value (or the unmodified path on platforms that
# don't need the strip).
os.environ["PYTHONEXEPATH"] = python_exe
cmd = [python_exe, "-m", "esphome.platformio_runner"] + list(args)
return run_external_process(*cmd, **kwargs)

View File

@@ -94,13 +94,29 @@ def safe_print(message="", end="\n"):
except UnicodeEncodeError:
pass
# Fall back to the stream's actual encoding (e.g. cp1252 on Windows
# redirected pipes). Use "backslashreplace" so unencodable code points
# like the wifi signal-bar block characters (U+2582..U+2588) become
# readable ``\uXXXX`` escapes, and decode back to ``str`` so ``print``
# never receives a ``bytes`` object (which would render as a ``b'...'``
# repr).
encoding = sys.stdout.encoding or "ascii"
try:
print(message.encode("utf-8", "backslashreplace"), end=end)
print(
message.encode(encoding, "backslashreplace").decode(encoding),
end=end,
)
return
except UnicodeEncodeError:
try:
print(message.encode("ascii", "backslashreplace"), end=end)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
pass
try:
print(
message.encode("ascii", "backslashreplace").decode("ascii"),
end=end,
)
except UnicodeEncodeError:
print("Cannot print line because of invalid locale!")
def safe_input(prompt=""):

View File

@@ -91,6 +91,24 @@ api:
- float_arr.size()
- string_arr[0].c_str()
- string_arr.size()
# Test array + string args used after a non-synchronous action (delay).
# The default non-owning types (StringRef, const FixedVector&) would
# dangle once rx_buf_ is reused, and FixedVector is non-copyable so
# DelayAction's lambda capture would fail to compile. The api codegen
# must fall back to owning std::string / std::vector here.
- action: array_with_delay
variables:
name: string
int_arr: int[]
string_arr: string[]
then:
- delay: 20ms
- logger.log:
format: "Delayed: %s (%u ints, %u strings)"
args:
- name.c_str()
- int_arr.size()
- string_arr.size()
# Test ContinuationAction (IfAction with then/else branches)
- action: test_if_action
variables:

View File

@@ -311,6 +311,105 @@ def test_run_platformio_cli_sets_environment_variables(
assert "arg" in args
@pytest.mark.parametrize(
("platform", "input_path", "expected"),
[
# win32: drive-letter extended-length prefix is stripped
(
"win32",
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# win32: UNC extended-length prefix is translated to a regular UNC path
(
"win32",
"\\\\?\\UNC\\server\\share\\python.exe",
"\\\\server\\share\\python.exe",
),
# win32: paths without the prefix are returned unchanged
(
"win32",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe",
),
# non-win32: prefix is left alone (no-op)
("linux", "\\\\?\\C:\\python.exe", "\\\\?\\C:\\python.exe"),
("darwin", "/usr/bin/python3", "/usr/bin/python3"),
],
)
def test_strip_win_long_path_prefix(
platform: str, input_path: str, expected: str
) -> None:
r"""``\\?\`` and ``\\?\UNC\`` prefixes are stripped only on win32."""
with patch("esphome.platformio_api.sys.platform", platform):
assert platformio_api._strip_win_long_path_prefix(input_path) == expected
def test_run_platformio_cli_strips_win_long_path_prefix(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""Windows ``\\?\`` prefix on sys.executable does not leak into the subprocess.
The NSIS-installed esphome.exe launcher starts Python with
``sys.executable`` already prefixed by the extended-length path marker.
That prefix would otherwise propagate into PlatformIO's ``PYTHONEXE`` and
break SCons-emitted command lines run through ``cmd.exe``.
"""
CORE.build_path = str(setup_core / "build" / "test")
prefixed_exe = (
"\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
stripped_exe = (
"C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe"
)
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "win32"),
patch("esphome.platformio_api.sys.executable", prefixed_exe),
):
# Pop any pre-existing PYTHONEXEPATH so the assertion below reflects
# what run_platformio_cli set, not whatever the test runner's
# environment happened to contain.
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# The subprocess is invoked with the stripped executable path.
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == stripped_exe
# PYTHONEXEPATH is exported with the stripped path so PlatformIO's
# get_pythonexe_path() picks it up in the subprocess.
assert os.environ["PYTHONEXEPATH"] == stripped_exe
def test_run_platformio_cli_does_not_set_pythonexepath_without_strip(
setup_core: Path, mock_run_external_process: Mock
) -> None:
r"""PYTHONEXEPATH is not touched when sys.executable has no ``\\?\`` prefix.
Setting it unconditionally would clobber a user-provided value (or
interfere with non-Windows tooling that has no prefix to strip).
"""
CORE.build_path = str(setup_core / "build" / "test")
plain_exe = "/usr/bin/python3"
with (
patch.dict(os.environ, {}, clear=False),
patch("esphome.platformio_api.sys.platform", "linux"),
patch("esphome.platformio_api.sys.executable", plain_exe),
):
os.environ.pop("PYTHONEXEPATH", None)
mock_run_external_process.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
mock_run_external_process.assert_called_once()
args = mock_run_external_process.call_args[0]
assert args[0] == plain_exe
assert "PYTHONEXEPATH" not in os.environ
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:

View File

@@ -709,3 +709,119 @@ def test_detect_rp2040_bootsel_timeout() -> None:
result = util.detect_rp2040_bootsel("/usr/bin/picotool")
assert result.device_count == 0
assert result.permission_error is False
class TestSafePrint:
"""Tests for ``safe_print`` and its UnicodeEncodeError fallback chain."""
@pytest.fixture(autouse=True)
def _no_dashboard(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Default ``CORE.dashboard`` to False so each test starts hermetic."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", False)
def test_prints_plain_message(self, capsys: pytest.CaptureFixture[str]) -> None:
"""ASCII-only messages take the fast path through native ``print``."""
util.safe_print("hello world")
assert capsys.readouterr().out == "hello world\n"
def test_prints_unicode_on_utf8_stdout(
self, capsys: pytest.CaptureFixture[str]
) -> None:
"""Non-ASCII goes straight through when stdout can encode it."""
util.safe_print("bars: \u2582\u2584\u2586\u2588")
assert capsys.readouterr().out == "bars: \u2582\u2584\u2586\u2588\n"
def test_dashboard_escapes_esc_byte(
self,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
r"""Dashboard mode escapes raw ``\033`` ESC bytes to literal ``\\033``."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
util.safe_print("\033[0;32mhi\033[0m")
assert capsys.readouterr().out == "\\033[0;32mhi\\033[0m\n"
def test_fallback_writes_string_not_bytes_repr(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Regression: cp1252 fallback must produce a printable str, not ``b'...'``.
On Windows, when stdout is a redirected pipe (e.g. the dashboard),
Python uses cp1252, which cannot encode the wifi signal-bar block
characters (U+2582..U+2588). The previous fallback path called
``print(message.encode(...))`` with a ``bytes`` object, which
Python's ``print`` rendered as a literal ``b'...'`` repr — visible
in the user's dashboard output. The fix re-encodes through the
stream's encoding with ``backslashreplace`` and decodes back to
``str``.
"""
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("bars: \u2582\u2584\u2586\u2588 done")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Output is a clean line, not the bytes repr.
assert not output.startswith("b'")
assert "b'bars" not in output
# Unencodable codepoints become readable backslash escapes.
assert "\\u2582\\u2584\\u2586\\u2588" in output
# Encodable parts survive unchanged.
assert "bars: " in output
assert " done" in output
assert output.endswith("\n")
def test_fallback_with_dashboard_escaped_message(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Dashboard ESC escaping + cp1252 fallback compose correctly."""
from esphome.core import CORE
monkeypatch.setattr(CORE, "dashboard", True)
buf = io.BytesIO()
cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict")
monkeypatch.setattr(sys, "stdout", cp1252_stream)
util.safe_print("\033[0;32m\u2582\u2584\u2586\u2588\033[0m")
cp1252_stream.flush()
output = buf.getvalue().decode("cp1252")
# Dashboard escaping turned ESC into literal "\033" (5 chars), which
# cp1252 can encode, so it survives the round-trip verbatim.
assert "\\033[0;32m" in output
assert "\\033[0m" in output
# Block characters became backslash escapes via backslashreplace.
assert "\\u2582\\u2584\\u2586\\u2588" in output
def test_final_message_when_locale_is_invalid(
self,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
"""If every encoding path fails, surface the locale-error sentinel."""
original_print = print
call_count = 0
def fake_print(*args: Any, **kwargs: Any) -> None:
nonlocal call_count
call_count += 1
# The first three calls are: native print, stream-encoding
# fallback, ASCII fallback. Make all three raise so we reach
# the final sentinel "Cannot print line..." which is expected
# to succeed (no encoding required).
if call_count <= 3:
raise UnicodeEncodeError("ascii", "x", 0, 1, "boom")
original_print(*args, **kwargs)
monkeypatch.setattr("builtins.print", fake_print)
util.safe_print("x")
assert call_count == 4
assert (
capsys.readouterr().out == "Cannot print line because of invalid locale!\n"
)