From 21e548f1d78a3ed225694bb9ef3d8df7feab71cd Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 27 May 2026 09:20:50 -0500 Subject: [PATCH] [core] Sensitive redaction via yaml_util representer (#16690) --- esphome/__main__.py | 38 +++++- esphome/components/wifi/__init__.py | 6 +- esphome/config_validation.py | 10 +- esphome/yaml_util.py | 39 +++++- tests/unit_tests/test_config_validation.py | 37 ++++++ tests/unit_tests/test_main.py | 131 +++++++++++++++++++++ tests/unit_tests/test_yaml_util.py | 55 +++++++++ 7 files changed, 306 insertions(+), 10 deletions(-) diff --git a/esphome/__main__.py b/esphome/__main__.py index 03f12c75d7..000087063f 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -1412,17 +1412,47 @@ def command_config(args: ArgsProtocol, config: ConfigType) -> int | None: if not CORE.verbose: config = strip_default_ids(config) output = yaml_util.dump(config, args.show_secrets) - # add the console decoration so the front-end can hide the secrets if not args.show_secrets: - output = re.sub( - r"(password|key|psk|ssid)\: (.+)", r"\1: \\033[8m\2\\033[28m", output - ) + output = _redact_with_legacy_fallback(output) if not CORE.quiet: safe_print(output) _LOGGER.info("Configuration is valid!") return 0 +# Legacy substring redaction fallback for unmigrated schemas; removed in +# 2026.12.0 once canonical sensitive fields are tagged. The lookahead skips +# values that already render themselves: ``\033[8m`` (SensitiveStr wrap), +# ``!secret`` (preserves the user-friendly tag), ``!lambda`` (multi-line +# block; first line is structural). The fragment must either start the +# field name or follow ``_`` so the warning names a real field; this avoids +# false positives like ``monkey:`` matching the ``key`` fragment. +_LEGACY_REDACTION_RE = re.compile( + r"(?P\b(?:\w+_)?(?:password|key|psk|ssid))\: " + r"(?!\\033\[8m|!secret\b|!lambda\b)(?P.+)" +) +_LEGACY_REDACTION_REMOVAL = "2026.12.0" + + +def _redact_with_legacy_fallback(output: str) -> str: + unmarked: set[str] = set() + + def _replace(m: re.Match[str]) -> str: + unmarked.add(m.group("key")) + return f"{m.group('key')}: \\033[8m{m.group('val')}\\033[28m" + + output = _LEGACY_REDACTION_RE.sub(_replace, output) + for key in sorted(unmarked): + _LOGGER.warning( + "Field '%s' is being redacted by a legacy substring heuristic. " + "Mark this field's schema validator with cv.sensitive(...) for " + "deterministic redaction; the heuristic will be removed in %s.", + key, + _LEGACY_REDACTION_REMOVAL, + ) + return output + + def command_config_hash(args: ArgsProtocol, config: ConfigType) -> int | None: # generating code might modify config, so it must be done in order to generate # a hash that will match what was generated when compiling and then running diff --git a/esphome/components/wifi/__init__.py b/esphome/components/wifi/__init__.py index 4e7dcc82e5..b7719c80d1 100644 --- a/esphome/components/wifi/__init__.py +++ b/esphome/components/wifi/__init__.py @@ -271,7 +271,7 @@ EAP_AUTH_SCHEMA = cv.All( WIFI_NETWORK_BASE = cv.Schema( { cv.GenerateID(): cv.declare_id(WiFiAP), - cv.Optional(CONF_SSID): cv.ssid, + cv.Optional(CONF_SSID): cv.sensitive(cv.ssid), cv.Optional(CONF_PASSWORD): cv.sensitive(validate_password), cv.Optional(CONF_CHANNEL): validate_channel, cv.Optional(CONF_MANUAL_IP): STA_MANUAL_IP_SCHEMA, @@ -434,7 +434,7 @@ CONFIG_SCHEMA = cv.All( cv.Optional(CONF_NETWORKS): cv.All( cv.ensure_list(WIFI_NETWORK_STA), cv.Length(max=MAX_WIFI_NETWORKS) ), - cv.Optional(CONF_SSID): cv.ssid, + cv.Optional(CONF_SSID): cv.sensitive(cv.ssid), cv.Optional(CONF_PASSWORD): cv.sensitive(validate_password), cv.Optional(CONF_MANUAL_IP): STA_MANUAL_IP_SCHEMA, cv.Optional(CONF_EAP): EAP_AUTH_SCHEMA, @@ -850,7 +850,7 @@ async def final_step(): WiFiConfigureAction, cv.Schema( { - cv.Required(CONF_SSID): cv.templatable(cv.ssid), + cv.Required(CONF_SSID): cv.sensitive(cv.templatable(cv.ssid)), cv.Required(CONF_PASSWORD): cv.sensitive(cv.templatable(validate_password)), cv.Optional(CONF_SAVE, default=True): cv.templatable(cv.boolean), cv.Optional(CONF_TIMEOUT, default="30000ms"): cv.templatable( diff --git a/esphome/config_validation.py b/esphome/config_validation.py index 2f09fdc105..0ef6d212fe 100644 --- a/esphome/config_validation.py +++ b/esphome/config_validation.py @@ -101,7 +101,7 @@ from esphome.schema_extractors import ( ) from esphome.util import parse_esphome_version from esphome.voluptuous_schema import _Schema -from esphome.yaml_util import make_data_base +from esphome.yaml_util import SensitiveStr, make_data_base _LOGGER = logging.getLogger(__name__) @@ -514,7 +514,13 @@ class SensitiveValidator: self.inner = inner def __call__(self, value: typing.Any) -> typing.Any: - return self.inner(value) + validated = self.inner(value) + # Tag string results so yaml_util.dump can mask them. Non-string + # results pass through unchanged; already-tagged values are not + # re-wrapped to keep nested cv.sensitive applications idempotent. + if isinstance(validated, str) and not isinstance(validated, SensitiveStr): + return SensitiveStr(validated) + return validated def __repr__(self) -> str: # Mirror the inner validator's repr so ``build_language_schema``'s diff --git a/esphome/yaml_util.py b/esphome/yaml_util.py index 28f72ab831..bfe1fb0136 100644 --- a/esphome/yaml_util.py +++ b/esphome/yaml_util.py @@ -52,6 +52,16 @@ _load_listeners: list[Callable[[Path], None]] = [] DocumentPath = list[str | int] +class SensitiveStr(str): + """Marker subclass for validated strings that should be masked in + user-visible YAML output. ``cv.sensitive`` wraps validated values in this + type so ``dump()`` can render them with ANSI conceal codes without + needing a post-process regex. + """ + + __slots__ = () + + @contextmanager def track_yaml_loads() -> Generator[list[Path]]: """Context manager that records every file loaded by the YAML loader. @@ -808,11 +818,18 @@ def dump(dict_, show_secrets=False, sort_keys=False): if show_secrets: _SECRET_VALUES.clear() _SECRET_CACHE.clear() + + # Per-call subclass so the redaction flag doesn't leak across calls. + # (``_SECRET_VALUES`` / ``_SECRET_CACHE`` remain module globals; YAML + # processing is single-threaded today, so this isolates only the flag.) + class _Dumper(ESPHomeDumper): + _redact_sensitive = not show_secrets + return yaml.dump( dict_, default_flow_style=False, allow_unicode=True, - Dumper=ESPHomeDumper, + Dumper=_Dumper, sort_keys=sort_keys, ) @@ -958,6 +975,10 @@ def format_path(path: DocumentPath, current_obj: Any) -> str: class ESPHomeDumper(yaml.SafeDumper): + # Default for the base class; per-call subclass in ``dump()`` overrides. + # When True, ``represent_sensitive`` wraps values in ANSI conceal codes. + _redact_sensitive: bool = False + def represent_mapping(self, tag, mapping, flow_style=None): value = [] node = yaml.MappingNode(tag, value, flow_style=flow_style) @@ -992,6 +1013,20 @@ class ESPHomeDumper(yaml.SafeDumper): return self.represent_secret(value) return self.represent_scalar(tag="tag:yaml.org,2002:str", value=str(value)) + def represent_sensitive(self, value: SensitiveStr) -> yaml.ScalarNode: + # Only the redact-and-not-a-secret branch is unique to sensitive + # values; otherwise let ``represent_stringify`` handle ``!secret`` + # precedence and the plain-str fallthrough. Conceal sequence is + # emitted as literal ``\033`` text (not actual ESC bytes) so the + # output matches the prior regex format and device-builder's + # ``\033[8m...\033[28m`` parser keeps working. + if self._redact_sensitive and not is_secret(value): + return self.represent_scalar( + tag="tag:yaml.org,2002:str", + value=f"\\033[8m{value}\\033[28m", + ) + return self.represent_stringify(value) + # pylint: disable=arguments-renamed def represent_bool(self, value): return self.represent_scalar( @@ -1063,6 +1098,8 @@ ESPHomeDumper.add_multi_representer( ) ESPHomeDumper.add_multi_representer(bool, ESPHomeDumper.represent_bool) ESPHomeDumper.add_multi_representer(str, ESPHomeDumper.represent_stringify) +# MRO-walked dispatch; SensitiveStr's own entry wins over the str one. +ESPHomeDumper.add_multi_representer(SensitiveStr, ESPHomeDumper.represent_sensitive) ESPHomeDumper.add_multi_representer(int, ESPHomeDumper.represent_int) ESPHomeDumper.add_multi_representer(float, ESPHomeDumper.represent_float) ESPHomeDumper.add_multi_representer(_BaseAddress, ESPHomeDumper.represent_stringify) diff --git a/tests/unit_tests/test_config_validation.py b/tests/unit_tests/test_config_validation.py index 2c34cbfb07..74d9a5047a 100644 --- a/tests/unit_tests/test_config_validation.py +++ b/tests/unit_tests/test_config_validation.py @@ -27,6 +27,7 @@ from esphome.const import ( SCHEDULER_DONT_RUN, ) from esphome.core import CORE, HexInt, Lambda +from esphome.yaml_util import SensitiveStr def test_check_not_templatable__invalid(): @@ -145,6 +146,42 @@ def test_sensitive__custom_inner_delegates_validation() -> None: validator(123) +def test_sensitive__wraps_string_result_in_sensitive_str() -> None: + validator = config_validation.sensitive() + result = validator("hunter2") + + assert isinstance(result, SensitiveStr) + assert isinstance(result, str) + assert result == "hunter2" + + +def test_sensitive__does_not_double_tag_already_sensitive() -> None: + # If the inner validator already returns a SensitiveStr (e.g., nested + # cv.sensitive wrappers), re-tagging is a no-op rather than a new + # SensitiveStr around the same value. + pre_tagged = SensitiveStr("hunter2") + + def inner(_value): + return pre_tagged + + validator = config_validation.sensitive(inner) + result = validator("anything") + + assert result is pre_tagged + + +def test_sensitive__non_string_result_passes_through() -> None: + # If an inner validator returns something other than a string (e.g., a + # Lambda template), the sensitive wrapper must not coerce it. + sentinel = object() + + def inner(_value): + return sentinel + + validator = config_validation.sensitive(inner) + assert validator("anything") is sentinel + + def test_sensitive__is_detectable_via_isinstance() -> None: validator = config_validation.sensitive() diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index f6b6d0b05f..26b550669f 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -22,6 +22,7 @@ from esphome.__main__ import ( Purpose, _get_configured_xtal_freq, _make_crystal_freq_callback, + _redact_with_legacy_fallback, _resolve_network_devices, _validate_bootloader_binary, _validate_partition_table_binary, @@ -29,6 +30,7 @@ from esphome.__main__ import ( command_analyze_memory, command_bundle, command_clean_all, + command_config, command_config_hash, command_rename, command_run, @@ -340,6 +342,135 @@ def mock_ram_strings_analyzer() -> Generator[Mock]: yield mock_class +def test_redact_with_legacy_fallback__wraps_unmarked_field( + caplog: pytest.LogCaptureFixture, +) -> None: + """Unmarked sensitive-shaped fields are redacted; a deprecation warning + is emitted naming the field.""" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + out = _redact_with_legacy_fallback("password: hunter2\n") + assert "password: \\033[8mhunter2\\033[28m" in out + assert any( + "password" in rec.message and "cv.sensitive" in rec.message + for rec in caplog.records + ) + + +def test_redact_with_legacy_fallback__skips_already_wrapped( + caplog: pytest.LogCaptureFixture, +) -> None: + """Values already wrapped by the SensitiveStr representer don't trigger + the heuristic or the warning.""" + wrapped = "password: \\033[8mhunter2\\033[28m\n" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + out = _redact_with_legacy_fallback(wrapped) + assert out == wrapped + assert not any("legacy substring" in rec.message for rec in caplog.records) + + +def test_redact_with_legacy_fallback__captures_full_field_name( + caplog: pytest.LogCaptureFixture, +) -> None: + """The warning names the actual field, not just the matched fragment.""" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + _redact_with_legacy_fallback("encryption_key: abc\n") + assert any("encryption_key" in rec.message for rec in caplog.records) + + +def test_redact_with_legacy_fallback__deduplicates_warnings( + caplog: pytest.LogCaptureFixture, +) -> None: + """One warning per unique field name even if it appears many times.""" + text = "password: a\npassword: b\npassword: c\n" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + _redact_with_legacy_fallback(text) + password_warnings = [rec for rec in caplog.records if "'password'" in rec.message] + assert len(password_warnings) == 1 + + +def test_redact_with_legacy_fallback__skips_lambda_values( + caplog: pytest.LogCaptureFixture, +) -> None: + """``!lambda`` first line is structural, body is unreachable by a + single-line regex anyway, and tagged fields shouldn't trigger a warning.""" + text = ' ssid: !lambda |-\n return "x";\n' + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + out = _redact_with_legacy_fallback(text) + assert out == text + assert not any("legacy substring" in rec.message for rec in caplog.records) + + +def test_redact_with_legacy_fallback__skips_secret_references( + caplog: pytest.LogCaptureFixture, +) -> None: + """``!secret name`` is the dumper's user-friendly representation; the + name isn't the secret, so wrapping it would clobber the round-trip.""" + text = " password: !secret wifi_password\n" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + out = _redact_with_legacy_fallback(text) + assert out == text + assert not any("legacy substring" in rec.message for rec in caplog.records) + + +def test_redact_with_legacy_fallback__does_not_match_fragment_in_middle( + caplog: pytest.LogCaptureFixture, +) -> None: + """Fragment must end the field name; embedded matches like + ``key_value_pair`` are unrelated to a sensitive key and must not be + redacted (matching the prior regex's scope).""" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + out = _redact_with_legacy_fallback("key_value_pair: abc\n") + assert "\\033[8m" not in out + assert not any("legacy substring" in rec.message for rec in caplog.records) + + +def test_redact_with_legacy_fallback__does_not_match_fragment_as_suffix( + caplog: pytest.LogCaptureFixture, +) -> None: + """Fragment must start the name or follow ``_``; ``monkey:`` shouldn't + fire a 'legacy heuristic' warning because there's no sensitive field + here — the user has nothing to migrate.""" + with caplog.at_level(logging.WARNING, logger="esphome.__main__"): + out = _redact_with_legacy_fallback("monkey: 1234\n") + assert "\\033[8m" not in out + assert not any("legacy substring" in rec.message for rec in caplog.records) + + +def test_command_config__invokes_legacy_fallback_when_redacting( + tmp_path: Path, capfd: CaptureFixture[str] +) -> None: + """``command_config`` runs the legacy fallback on the dumped output when + ``--show-secrets`` is off. Cover the wiring (not just the helper). + """ + setup_core(tmp_path=tmp_path, config={"esphome": {"name": "test"}}) + args = MockArgs() + args.show_secrets = False + + result = command_config(args, {"wifi": {"password": "hunter2"}}) + + assert result == 0 + output = capfd.readouterr().out + assert "\\033[8mhunter2\\033[28m" in output + + +def test_command_config__show_secrets_skips_redaction( + tmp_path: Path, capfd: CaptureFixture[str] +) -> None: + """With ``--show-secrets`` the helper isn't invoked and the value + renders raw. + """ + setup_core(tmp_path=tmp_path, config={"esphome": {"name": "test"}}) + args = MockArgs() + args.show_secrets = True + + result = command_config(args, {"wifi": {"password": "hunter2"}}) + + assert result == 0 + output = capfd.readouterr().out + assert "hunter2" in output + assert "\\033[8m" not in output + + def test_choose_upload_log_host_with_string_default() -> None: """Test with a single string default device.""" setup_core() diff --git a/tests/unit_tests/test_yaml_util.py b/tests/unit_tests/test_yaml_util.py index d6fb5b81f2..6be090b869 100644 --- a/tests/unit_tests/test_yaml_util.py +++ b/tests/unit_tests/test_yaml_util.py @@ -15,6 +15,7 @@ from esphome.yaml_util import ( DiscoveredYamlFiles, ESPHomeDataBase, ESPLiteralValue, + SensitiveStr, discover_user_yaml_files, force_load_include_files, format_path, @@ -1340,3 +1341,57 @@ def test_frontmatter_included_file_stored(tmp_path: Path) -> None: assert main.resolve() not in core.CORE.frontmatter # Included file's frontmatter is captured assert core.CORE.frontmatter[inc.resolve()]["child_meta"] == "hello" + + +def test_sensitive_str__is_a_str_subclass() -> None: + value = SensitiveStr("hunter2") + assert isinstance(value, str) + assert value == "hunter2" + + +def test_dump__redacts_sensitive_str_by_default() -> None: + out = yaml_util.dump({"password": SensitiveStr("hunter2")}) + assert "\\033[8mhunter2\\033[28m" in out + assert "hunter2" not in out.replace( + "\\033[8mhunter2\\033[28m", "" + ) # the raw value is only present inside the wrap + + +def test_dump__show_secrets_emits_sensitive_str_raw() -> None: + out = yaml_util.dump({"password": SensitiveStr("hunter2")}, show_secrets=True) + assert "hunter2" in out + assert "\\033[8m" not in out + assert "\\033[28m" not in out + + +def test_dump__plain_str_is_not_redacted() -> None: + out = yaml_util.dump({"hostname": "myserver"}) + assert "myserver" in out + assert "\\033[8m" not in out + + +def test_dump__secret_reference_wins_over_redaction() -> None: + # If the value also has an entry in _SECRET_VALUES (i.e., it was loaded + # via !secret), the dump should render it as !secret , not as a + # redacted scalar. SensitiveStr layered on top must not change that. + value = SensitiveStr("hunter2") + yaml_util._SECRET_VALUES[str(value)] = "my_secret_name" + try: + out = yaml_util.dump({"password": value}) + assert "!secret" in out + assert "my_secret_name" in out + assert "\\033[8m" not in out + finally: + yaml_util._SECRET_VALUES.clear() + + +def test_dump__redaction_flag_does_not_leak_between_calls() -> None: + # Per-call _Dumper subclass means show_secrets in one call doesn't + # affect another. Run them in both orders to catch any leakage. + redacted = yaml_util.dump({"password": SensitiveStr("hunter2")}) + raw = yaml_util.dump({"password": SensitiveStr("hunter2")}, show_secrets=True) + redacted_again = yaml_util.dump({"password": SensitiveStr("hunter2")}) + + assert "\\033[8m" in redacted + assert "\\033[8m" not in raw + assert "\\033[8m" in redacted_again