diff --git a/Doxyfile b/Doxyfile index 0e6a845ed8..2e86f54e96 100644 --- a/Doxyfile +++ b/Doxyfile @@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 2026.4.3 +PROJECT_NUMBER = 2026.4.4 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/esphome/automation.py b/esphome/automation.py index b4dcc41995..bfbfd58d8a 100644 --- a/esphome/automation.py +++ b/esphome/automation.py @@ -598,7 +598,7 @@ async def component_resume_action_to_code( comp = await cg.get_variable(config[CONF_ID]) var = cg.new_Pvariable(action_id, template_arg, comp) if CONF_UPDATE_INTERVAL in config: - template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int) + template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, cg.uint32) cg.add(var.set_update_interval(template_)) return var diff --git a/esphome/components/api/__init__.py b/esphome/components/api/__init__.py index 84589d540d..623da2247e 100644 --- a/esphome/components/api/__init__.py +++ b/esphome/components/api/__init__.py @@ -72,17 +72,35 @@ APIUnregisterServiceCallAction = api_ns.class_( UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger) ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument") -SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = { +# Owning element type for each YAML service variable type. Used to derive both +# the zero-copy native types and the owning fallback types below. +_SERVICE_ARG_SCALAR_TYPES: dict[str, MockObj] = { "bool": cg.bool_, "int": cg.int32, "float": cg.float_, + "string": cg.std_string, +} +SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = { + # Scalars are passed by value; string uses a non-owning view into rx_buf_. + **_SERVICE_ARG_SCALAR_TYPES, "string": cg.StringRef, - "bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"), - "int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"), - "float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"), - "string[]": cg.FixedVector.template(cg.std_string) - .operator("const") - .operator("ref"), + # Arrays are passed as non-owning const references into rx_buf_. + **{ + f"{name}[]": cg.FixedVector.template(t).operator("const").operator("ref") + for name, t in _SERVICE_ARG_SCALAR_TYPES.items() + }, +} +# Owning fallback types used when the action chain contains non-synchronous actions +# (delay, wait_until, script.wait, etc.). The default non-owning types reference +# storage in the receive buffer, which is reused once the synchronous portion of +# the chain returns. FixedVector is also non-copyable, so the deferred lambda +# capture in DelayAction::play_complex would fail to compile. +SERVICE_ARG_FALLBACK_TYPES: dict[str, MockObj] = { + "string": cg.std_string, + **{ + f"{name}[]": cg.std_vector.template(t) + for name, t in _SERVICE_ARG_SCALAR_TYPES.items() + }, } CONF_ENCRYPTION = "encryption" CONF_BATCH_DELAY = "batch_delay" @@ -382,17 +400,20 @@ async def to_code(config: ConfigType) -> None: func_args.append((cg.bool_, "return_response")) # Check if action chain has non-synchronous actions that would make - # non-owning StringRef dangle (rx_buf_ reused after delay) + # non-owning args (StringRef, const FixedVector&) dangle once the + # rx_buf_ is reused after a delay/wait_until/script.wait/etc. The + # FixedVector references would also fail to compile because they + # are non-copyable and DelayAction captures args by value. has_non_synchronous = automation.has_non_synchronous_actions( conf.get(CONF_THEN, []) ) service_arg_names: list[str] = [] for name, var_ in conf[CONF_VARIABLES].items(): - native = SERVICE_ARG_NATIVE_TYPES[var_] - # Fall back to std::string for string args if non-synchronous actions exist - if has_non_synchronous and native is cg.StringRef: - native = cg.std_string + if has_non_synchronous and var_ in SERVICE_ARG_FALLBACK_TYPES: + native = SERVICE_ARG_FALLBACK_TYPES[var_] + else: + native = SERVICE_ARG_NATIVE_TYPES[var_] service_template_args.append(native) func_args.append((native, name)) service_arg_names.append(name) diff --git a/esphome/components/api/client.py b/esphome/components/api/client.py index 0c6c569c7d..0982ca905b 100644 --- a/esphome/components/api/client.py +++ b/esphome/components/api/client.py @@ -20,6 +20,7 @@ import contextlib from esphome.const import CONF_KEY, CONF_PORT, __version__ from esphome.core import CORE from esphome.platformio_api import process_stacktrace +from esphome.util import safe_print from . import CONF_ENCRYPTION @@ -60,7 +61,6 @@ async def async_run_logs( noise_psk=noise_psk, addresses=addresses, # Pass all addresses for automatic retry ) - dashboard = CORE.dashboard backtrace_state = False # Try platform-specific stacktrace handler first, fall back to generic @@ -82,7 +82,11 @@ async def async_run_logs( f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]" ) for parsed_msg in parse_log_message(text, timestamp): - print(parsed_msg.replace("\033", "\\033") if dashboard else parsed_msg) + # safe_print handles the dashboard \033 escaping and falls back + # to backslashreplace encoding on stdouts that can't represent + # the wifi signal-bar block characters (Windows redirected + # cp1252 pipe). + safe_print(parsed_msg) for raw_line in text.splitlines(): if platform_process_stacktrace: backtrace_state = platform_process_stacktrace( diff --git a/esphome/components/esp32/__init__.py b/esphome/components/esp32/__init__.py index 77b405a449..30acbc3e41 100644 --- a/esphome/components/esp32/__init__.py +++ b/esphome/components/esp32/__init__.py @@ -1740,7 +1740,17 @@ async def to_code(config): # Wrap FILE*-based printf functions to eliminate newlib's _vfprintf_r # (~11 KB). See printf_stubs.cpp for implementation. - if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF]: + # + # The wrap is only beneficial against newlib. Picolibc's tinystdio + # implements vsnprintf by building a string-output FILE and calling + # vfprintf, so vfprintf is unconditionally linked in by any caller + # of snprintf/vsnprintf — effectively every build — and the wrap + # saves nothing while costing ~170 B of shim. IDF 5.x defaults to + # newlib on every variant; IDF 6.0+ switches to picolibc on every + # variant. + if conf[CONF_ADVANCED][CONF_ENABLE_FULL_PRINTF] or idf_version() >= cv.Version( + 6, 0, 0 + ): cg.add_define("USE_FULL_PRINTF") else: for symbol in ("vprintf", "printf", "fprintf", "vfprintf"): diff --git a/esphome/components/esp32/printf_stubs.cpp b/esphome/components/esp32/printf_stubs.cpp index 386fbbd79d..489c503942 100644 --- a/esphome/components/esp32/printf_stubs.cpp +++ b/esphome/components/esp32/printf_stubs.cpp @@ -1,32 +1,38 @@ /* - * Linker wrap stubs for FILE*-based printf functions. + * Linker wrap stubs for FILE*-based printf functions (newlib only). * * ESP-IDF SDK components (gpio driver, ringbuf, log_write) reference - * fprintf(), printf(), vprintf(), and vfprintf() which pull in the full - * printf implementation (~11 KB on newlib's _vfprintf_r, ~2.8 KB on - * picolibc's vfprintf). This is a separate implementation from the one - * used by snprintf/vsnprintf that handles FILE* stream I/O with buffering - * and locking. + * fprintf(), printf(), vprintf(), and vfprintf(), which on newlib pull + * in _vfprintf_r (~11 KB) — a separate implementation from the one used + * by snprintf/vsnprintf that handles FILE* stream I/O with buffering. * * ESPHome replaces the ESP-IDF log handler via esp_log_set_vprintf_(), * so the SDK's vprintf() path is dead code at runtime. The fprintf() * and printf() calls in SDK components are only in debug/assert paths * (gpio_dump_io_configuration, ringbuf diagnostics) that are either * GC'd or never called. Crash backtraces and panic output are - * unaffected — they use esp_rom_printf() which is a ROM function - * and does not go through libc. + * unaffected; they use esp_rom_printf() which is a ROM function and + * does not go through libc. * - * These stubs redirect through vsnprintf() (which uses _svfprintf_r - * already in the binary) and fwrite(), allowing the linker to - * dead-code eliminate _vfprintf_r. + * This wrap is newlib-only. On picolibc, vsnprintf is implemented as + * vfprintf into a string-output FILE, so vfprintf is unconditionally + * linked in by any caller of snprintf/vsnprintf and the wrap can never + * elide it — it just adds shim cost. Codegen forces USE_FULL_PRINTF + * on picolibc builds (IDF 6.0+ on all variants) so this file compiles + * to nothing there; the #error below catches a desynchronised gate. * - * Saves ~11 KB of flash. + * Saves ~11 KB of flash on newlib. * - * To disable these wraps, set enable_full_printf: true in the esp32 - * advanced config section. + * To disable this wrap on newlib, set enable_full_printf: true in the + * esp32 advanced config section. */ #if defined(USE_ESP_IDF) && !defined(USE_FULL_PRINTF) + +#ifdef __PICOLIBC__ +#error "printf wrap is net-negative on picolibc; codegen should set USE_FULL_PRINTF" +#endif + #include #include @@ -34,6 +40,9 @@ namespace esphome::esp32 {} +// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming) +extern "C" { + static constexpr size_t PRINTF_BUFFER_SIZE = 512; // These stubs are essentially dead code at runtime — ESPHome replaces the @@ -55,14 +64,16 @@ static int write_printf_buffer(FILE *stream, char *buf, int len) { return len; } -// NOLINTBEGIN(bugprone-reserved-identifier,cert-dcl37-c,cert-dcl51-cpp,readability-identifier-naming) -extern "C" { - int __wrap_vprintf(const char *fmt, va_list ap) { char buf[PRINTF_BUFFER_SIZE]; return write_printf_buffer(stdout, buf, vsnprintf(buf, sizeof(buf), fmt, ap)); } +int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) { + char buf[PRINTF_BUFFER_SIZE]; + return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap)); +} + int __wrap_printf(const char *fmt, ...) { va_list ap; va_start(ap, fmt); @@ -71,11 +82,6 @@ int __wrap_printf(const char *fmt, ...) { return len; } -int __wrap_vfprintf(FILE *stream, const char *fmt, va_list ap) { - char buf[PRINTF_BUFFER_SIZE]; - return write_printf_buffer(stream, buf, vsnprintf(buf, sizeof(buf), fmt, ap)); -} - int __wrap_fprintf(FILE *stream, const char *fmt, ...) { va_list ap; va_start(ap, fmt); diff --git a/esphome/components/lvgl/lvgl_esphome.cpp b/esphome/components/lvgl/lvgl_esphome.cpp index d8248e4aa4..722a7a1b02 100644 --- a/esphome/components/lvgl/lvgl_esphome.cpp +++ b/esphome/components/lvgl/lvgl_esphome.cpp @@ -454,10 +454,12 @@ void LVTouchListener::update(const touchscreen::TouchPoints_t &tpoints) { #ifdef USE_LVGL_METER -int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value) { +int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value) { auto *scale = lv_obj_get_parent(obj); auto min_value = lv_scale_get_range_min_value(scale); - return ((value - min_value) * lv_scale_get_angle_range(scale) / (lv_scale_get_range_max_value(scale) - min_value) + + auto max_value = lv_scale_get_range_max_value(scale); + value = clamp(value, min_value, max_value); + return ((value - min_value) * lv_scale_get_angle_range(scale) / (max_value - min_value) + lv_scale_get_rotation((scale))) % 360; } diff --git a/esphome/components/lvgl/lvgl_esphome.h b/esphome/components/lvgl/lvgl_esphome.h index 146866f5bd..8c0b10e1bc 100644 --- a/esphome/components/lvgl/lvgl_esphome.h +++ b/esphome/components/lvgl/lvgl_esphome.h @@ -112,7 +112,7 @@ inline void lv_animimg_set_src(lv_obj_t *img, std::vector images #endif // USE_LVGL_ANIMIMG #ifdef USE_LVGL_METER -int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int value); +int16_t lv_get_needle_angle_for_value(lv_obj_t *obj, int32_t value); #endif // Parent class for things that wrap an LVGL object diff --git a/esphome/components/mcp23xxx_base/__init__.py b/esphome/components/mcp23xxx_base/__init__.py index cd952099c0..76a3aabe3f 100644 --- a/esphome/components/mcp23xxx_base/__init__.py +++ b/esphome/components/mcp23xxx_base/__init__.py @@ -2,6 +2,7 @@ from esphome import pins import esphome.codegen as cg import esphome.config_validation as cv from esphome.const import ( + CONF_ALLOW_OTHER_USES, CONF_ID, CONF_INPUT, CONF_INTERRUPT, @@ -30,10 +31,29 @@ MCP23XXX_INTERRUPT_MODES = { "FALLING": MCP23XXXInterruptMode.MCP23XXX_FALLING, } + +def _validate_interrupt_pin(value): + # The MCP component owns INT polarity (active-low, hardcoded falling-edge ISR) + # and installs a single ISR per GPIO, so neither inversion nor sharing is supported. + value = pins.internal_gpio_input_pin_schema(value) + if value.get(CONF_INVERTED): + raise cv.Invalid( + f"'{CONF_INVERTED}: true' is not supported on '{CONF_INTERRUPT_PIN}'; " + "the MCP23xxx INT line is fixed active-low" + ) + if value.get(CONF_ALLOW_OTHER_USES): + raise cv.Invalid( + f"'{CONF_ALLOW_OTHER_USES}: true' is not supported on '{CONF_INTERRUPT_PIN}'; " + "sharing the interrupt pin between multiple MCP23xxx (or other components) " + "is not implemented. Remove the interrupt_pin to fall back to polling." + ) + return value + + MCP23XXX_CONFIG_SCHEMA = cv.Schema( { cv.Optional(CONF_OPEN_DRAIN_INTERRUPT, default=False): cv.boolean, - cv.Optional(CONF_INTERRUPT_PIN): pins.internal_gpio_input_pin_schema, + cv.Optional(CONF_INTERRUPT_PIN): _validate_interrupt_pin, } ).extend(cv.COMPONENT_SCHEMA) diff --git a/esphome/const.py b/esphome/const.py index 89b6ff15ee..bc31ab36b5 100644 --- a/esphome/const.py +++ b/esphome/const.py @@ -4,7 +4,7 @@ from enum import Enum from esphome.enum import StrEnum -__version__ = "2026.4.3" +__version__ = "2026.4.4" ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_" VALID_SUBSTITUTIONS_CHARACTERS = ( diff --git a/esphome/platformio_api.py b/esphome/platformio_api.py index fc21977fdd..544bca3b94 100644 --- a/esphome/platformio_api.py +++ b/esphome/platformio_api.py @@ -53,6 +53,37 @@ FILTER_PLATFORMIO_LINES = [ ] +def _strip_win_long_path_prefix(path: str) -> str: + r"""Strip the Windows extended-length path prefix from ``path``. + + Handles both forms documented at + https://learn.microsoft.com/windows/win32/fileio/naming-a-file: + + * ``\\?\C:\path\to\file`` -> ``C:\path\to\file`` + * ``\\?\UNC\server\share\path`` -> ``\\server\share\path`` + + The NSIS-installed ``esphome.exe`` launcher on Windows starts Python with + ``sys.executable`` already prefixed with ``\\?\``. That prefix propagates + into PlatformIO's ``$PYTHONEXE`` (PlatformIO reads ``PYTHONEXEPATH`` from + the environment, falling back to ``os.path.normpath(sys.executable)``) + and ends up baked into SCons-emitted command lines for build steps such + as the esp8266 ``elf2bin`` invocation. ``cmd.exe`` does not understand + the ``\\?\`` prefix, so the build fails with + "The system cannot find the path specified." Stripping the prefix early + keeps the path shell-quotable. + + No-op on non-Windows platforms. + """ + if sys.platform != "win32": + return path + if path.startswith("\\\\?\\UNC\\"): + # \\?\UNC\server\share\... -> \\server\share\... + return "\\\\" + path[len("\\\\?\\UNC\\") :] + if path.startswith("\\\\?\\"): + return path[len("\\\\?\\") :] + return path + + def run_platformio_cli(*args, **kwargs) -> str | int: os.environ["PLATFORMIO_FORCE_COLOR"] = "true" os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute()) @@ -63,7 +94,18 @@ def run_platformio_cli(*args, **kwargs) -> str | int: os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning") # Increase uv retry count to handle transient network errors (default is 3) os.environ.setdefault("UV_HTTP_RETRIES", "10") - cmd = [sys.executable, "-m", "esphome.platformio_runner"] + list(args) + # Strip the Windows extended-length path prefix from sys.executable so it + # doesn't propagate into PlatformIO's $PYTHONEXE and break SCons-emitted + # command lines run through cmd.exe. + python_exe = _strip_win_long_path_prefix(sys.executable) + if python_exe != sys.executable: + # Only override PYTHONEXEPATH when we actually stripped a prefix. + # PlatformIO's get_pythonexe_path() reads this and falls back to + # sys.executable otherwise; setting it unconditionally would clobber + # a user-provided value (or the unmodified path on platforms that + # don't need the strip). + os.environ["PYTHONEXEPATH"] = python_exe + cmd = [python_exe, "-m", "esphome.platformio_runner"] + list(args) return run_external_process(*cmd, **kwargs) diff --git a/esphome/util.py b/esphome/util.py index 73cc3aa5ab..e96a52de86 100644 --- a/esphome/util.py +++ b/esphome/util.py @@ -94,13 +94,29 @@ def safe_print(message="", end="\n"): except UnicodeEncodeError: pass + # Fall back to the stream's actual encoding (e.g. cp1252 on Windows + # redirected pipes). Use "backslashreplace" so unencodable code points + # like the wifi signal-bar block characters (U+2582..U+2588) become + # readable ``\uXXXX`` escapes, and decode back to ``str`` so ``print`` + # never receives a ``bytes`` object (which would render as a ``b'...'`` + # repr). + encoding = sys.stdout.encoding or "ascii" try: - print(message.encode("utf-8", "backslashreplace"), end=end) + print( + message.encode(encoding, "backslashreplace").decode(encoding), + end=end, + ) + return except UnicodeEncodeError: - try: - print(message.encode("ascii", "backslashreplace"), end=end) - except UnicodeEncodeError: - print("Cannot print line because of invalid locale!") + pass + + try: + print( + message.encode("ascii", "backslashreplace").decode("ascii"), + end=end, + ) + except UnicodeEncodeError: + print("Cannot print line because of invalid locale!") def safe_input(prompt=""): diff --git a/tests/components/api/common-base.yaml b/tests/components/api/common-base.yaml index c766b61b13..504c52a57b 100644 --- a/tests/components/api/common-base.yaml +++ b/tests/components/api/common-base.yaml @@ -91,6 +91,24 @@ api: - float_arr.size() - string_arr[0].c_str() - string_arr.size() + # Test array + string args used after a non-synchronous action (delay). + # The default non-owning types (StringRef, const FixedVector&) would + # dangle once rx_buf_ is reused, and FixedVector is non-copyable so + # DelayAction's lambda capture would fail to compile. The api codegen + # must fall back to owning std::string / std::vector here. + - action: array_with_delay + variables: + name: string + int_arr: int[] + string_arr: string[] + then: + - delay: 20ms + - logger.log: + format: "Delayed: %s (%u ints, %u strings)" + args: + - name.c_str() + - int_arr.size() + - string_arr.size() # Test ContinuationAction (IfAction with then/else branches) - action: test_if_action variables: diff --git a/tests/unit_tests/test_platformio_api.py b/tests/unit_tests/test_platformio_api.py index ddc4e45c84..a92e00167d 100644 --- a/tests/unit_tests/test_platformio_api.py +++ b/tests/unit_tests/test_platformio_api.py @@ -311,6 +311,105 @@ def test_run_platformio_cli_sets_environment_variables( assert "arg" in args +@pytest.mark.parametrize( + ("platform", "input_path", "expected"), + [ + # win32: drive-letter extended-length prefix is stripped + ( + "win32", + "\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe", + "C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe", + ), + # win32: UNC extended-length prefix is translated to a regular UNC path + ( + "win32", + "\\\\?\\UNC\\server\\share\\python.exe", + "\\\\server\\share\\python.exe", + ), + # win32: paths without the prefix are returned unchanged + ( + "win32", + "C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe", + "C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe", + ), + # non-win32: prefix is left alone (no-op) + ("linux", "\\\\?\\C:\\python.exe", "\\\\?\\C:\\python.exe"), + ("darwin", "/usr/bin/python3", "/usr/bin/python3"), + ], +) +def test_strip_win_long_path_prefix( + platform: str, input_path: str, expected: str +) -> None: + r"""``\\?\`` and ``\\?\UNC\`` prefixes are stripped only on win32.""" + with patch("esphome.platformio_api.sys.platform", platform): + assert platformio_api._strip_win_long_path_prefix(input_path) == expected + + +def test_run_platformio_cli_strips_win_long_path_prefix( + setup_core: Path, mock_run_external_process: Mock +) -> None: + r"""Windows ``\\?\`` prefix on sys.executable does not leak into the subprocess. + + The NSIS-installed esphome.exe launcher starts Python with + ``sys.executable`` already prefixed by the extended-length path marker. + That prefix would otherwise propagate into PlatformIO's ``PYTHONEXE`` and + break SCons-emitted command lines run through ``cmd.exe``. + """ + CORE.build_path = str(setup_core / "build" / "test") + prefixed_exe = ( + "\\\\?\\C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe" + ) + stripped_exe = ( + "C:\\Users\\jesse\\AppData\\Local\\ESPHome Builder\\python\\python.exe" + ) + + with ( + patch.dict(os.environ, {}, clear=False), + patch("esphome.platformio_api.sys.platform", "win32"), + patch("esphome.platformio_api.sys.executable", prefixed_exe), + ): + # Pop any pre-existing PYTHONEXEPATH so the assertion below reflects + # what run_platformio_cli set, not whatever the test runner's + # environment happened to contain. + os.environ.pop("PYTHONEXEPATH", None) + mock_run_external_process.return_value = 0 + platformio_api.run_platformio_cli("test", "arg") + + # The subprocess is invoked with the stripped executable path. + mock_run_external_process.assert_called_once() + args = mock_run_external_process.call_args[0] + assert args[0] == stripped_exe + # PYTHONEXEPATH is exported with the stripped path so PlatformIO's + # get_pythonexe_path() picks it up in the subprocess. + assert os.environ["PYTHONEXEPATH"] == stripped_exe + + +def test_run_platformio_cli_does_not_set_pythonexepath_without_strip( + setup_core: Path, mock_run_external_process: Mock +) -> None: + r"""PYTHONEXEPATH is not touched when sys.executable has no ``\\?\`` prefix. + + Setting it unconditionally would clobber a user-provided value (or + interfere with non-Windows tooling that has no prefix to strip). + """ + CORE.build_path = str(setup_core / "build" / "test") + plain_exe = "/usr/bin/python3" + + with ( + patch.dict(os.environ, {}, clear=False), + patch("esphome.platformio_api.sys.platform", "linux"), + patch("esphome.platformio_api.sys.executable", plain_exe), + ): + os.environ.pop("PYTHONEXEPATH", None) + mock_run_external_process.return_value = 0 + platformio_api.run_platformio_cli("test", "arg") + + mock_run_external_process.assert_called_once() + args = mock_run_external_process.call_args[0] + assert args[0] == plain_exe + assert "PYTHONEXEPATH" not in os.environ + + def test_run_platformio_cli_run_builds_command( setup_core: Path, mock_run_platformio_cli: Mock ) -> None: diff --git a/tests/unit_tests/test_util.py b/tests/unit_tests/test_util.py index ff58fb1394..581b1aca99 100644 --- a/tests/unit_tests/test_util.py +++ b/tests/unit_tests/test_util.py @@ -709,3 +709,119 @@ def test_detect_rp2040_bootsel_timeout() -> None: result = util.detect_rp2040_bootsel("/usr/bin/picotool") assert result.device_count == 0 assert result.permission_error is False + + +class TestSafePrint: + """Tests for ``safe_print`` and its UnicodeEncodeError fallback chain.""" + + @pytest.fixture(autouse=True) + def _no_dashboard(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Default ``CORE.dashboard`` to False so each test starts hermetic.""" + from esphome.core import CORE + + monkeypatch.setattr(CORE, "dashboard", False) + + def test_prints_plain_message(self, capsys: pytest.CaptureFixture[str]) -> None: + """ASCII-only messages take the fast path through native ``print``.""" + util.safe_print("hello world") + assert capsys.readouterr().out == "hello world\n" + + def test_prints_unicode_on_utf8_stdout( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + """Non-ASCII goes straight through when stdout can encode it.""" + util.safe_print("bars: \u2582\u2584\u2586\u2588") + assert capsys.readouterr().out == "bars: \u2582\u2584\u2586\u2588\n" + + def test_dashboard_escapes_esc_byte( + self, + capsys: pytest.CaptureFixture[str], + monkeypatch: pytest.MonkeyPatch, + ) -> None: + r"""Dashboard mode escapes raw ``\033`` ESC bytes to literal ``\\033``.""" + from esphome.core import CORE + + monkeypatch.setattr(CORE, "dashboard", True) + util.safe_print("\033[0;32mhi\033[0m") + assert capsys.readouterr().out == "\\033[0;32mhi\\033[0m\n" + + def test_fallback_writes_string_not_bytes_repr( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Regression: cp1252 fallback must produce a printable str, not ``b'...'``. + + On Windows, when stdout is a redirected pipe (e.g. the dashboard), + Python uses cp1252, which cannot encode the wifi signal-bar block + characters (U+2582..U+2588). The previous fallback path called + ``print(message.encode(...))`` with a ``bytes`` object, which + Python's ``print`` rendered as a literal ``b'...'`` repr — visible + in the user's dashboard output. The fix re-encodes through the + stream's encoding with ``backslashreplace`` and decodes back to + ``str``. + """ + buf = io.BytesIO() + cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict") + monkeypatch.setattr(sys, "stdout", cp1252_stream) + + util.safe_print("bars: \u2582\u2584\u2586\u2588 done") + cp1252_stream.flush() + output = buf.getvalue().decode("cp1252") + + # Output is a clean line, not the bytes repr. + assert not output.startswith("b'") + assert "b'bars" not in output + # Unencodable codepoints become readable backslash escapes. + assert "\\u2582\\u2584\\u2586\\u2588" in output + # Encodable parts survive unchanged. + assert "bars: " in output + assert " done" in output + assert output.endswith("\n") + + def test_fallback_with_dashboard_escaped_message( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Dashboard ESC escaping + cp1252 fallback compose correctly.""" + from esphome.core import CORE + + monkeypatch.setattr(CORE, "dashboard", True) + buf = io.BytesIO() + cp1252_stream = io.TextIOWrapper(buf, encoding="cp1252", errors="strict") + monkeypatch.setattr(sys, "stdout", cp1252_stream) + + util.safe_print("\033[0;32m\u2582\u2584\u2586\u2588\033[0m") + cp1252_stream.flush() + output = buf.getvalue().decode("cp1252") + + # Dashboard escaping turned ESC into literal "\033" (5 chars), which + # cp1252 can encode, so it survives the round-trip verbatim. + assert "\\033[0;32m" in output + assert "\\033[0m" in output + # Block characters became backslash escapes via backslashreplace. + assert "\\u2582\\u2584\\u2586\\u2588" in output + + def test_final_message_when_locale_is_invalid( + self, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], + ) -> None: + """If every encoding path fails, surface the locale-error sentinel.""" + original_print = print + call_count = 0 + + def fake_print(*args: Any, **kwargs: Any) -> None: + nonlocal call_count + call_count += 1 + # The first three calls are: native print, stream-encoding + # fallback, ASCII fallback. Make all three raise so we reach + # the final sentinel "Cannot print line..." which is expected + # to succeed (no encoding required). + if call_count <= 3: + raise UnicodeEncodeError("ascii", "x", 0, 1, "boom") + original_print(*args, **kwargs) + + monkeypatch.setattr("builtins.print", fake_print) + util.safe_print("x") + assert call_count == 4 + assert ( + capsys.readouterr().out == "Cannot print line because of invalid locale!\n" + )