mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 12:35:25 +00:00
[core] Sensitive redaction via yaml_util representer (#16690)
This commit is contained in:
@@ -1412,17 +1412,47 @@ def command_config(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||
if not CORE.verbose:
|
||||
config = strip_default_ids(config)
|
||||
output = yaml_util.dump(config, args.show_secrets)
|
||||
# add the console decoration so the front-end can hide the secrets
|
||||
if not args.show_secrets:
|
||||
output = re.sub(
|
||||
r"(password|key|psk|ssid)\: (.+)", r"\1: \\033[8m\2\\033[28m", output
|
||||
)
|
||||
output = _redact_with_legacy_fallback(output)
|
||||
if not CORE.quiet:
|
||||
safe_print(output)
|
||||
_LOGGER.info("Configuration is valid!")
|
||||
return 0
|
||||
|
||||
|
||||
# Legacy substring redaction fallback for unmigrated schemas; removed in
|
||||
# 2026.12.0 once canonical sensitive fields are tagged. The lookahead skips
|
||||
# values that already render themselves: ``\033[8m`` (SensitiveStr wrap),
|
||||
# ``!secret`` (preserves the user-friendly tag), ``!lambda`` (multi-line
|
||||
# block; first line is structural). The fragment must either start the
|
||||
# field name or follow ``_`` so the warning names a real field; this avoids
|
||||
# false positives like ``monkey:`` matching the ``key`` fragment.
|
||||
_LEGACY_REDACTION_RE = re.compile(
|
||||
r"(?P<key>\b(?:\w+_)?(?:password|key|psk|ssid))\: "
|
||||
r"(?!\\033\[8m|!secret\b|!lambda\b)(?P<val>.+)"
|
||||
)
|
||||
_LEGACY_REDACTION_REMOVAL = "2026.12.0"
|
||||
|
||||
|
||||
def _redact_with_legacy_fallback(output: str) -> str:
|
||||
unmarked: set[str] = set()
|
||||
|
||||
def _replace(m: re.Match[str]) -> str:
|
||||
unmarked.add(m.group("key"))
|
||||
return f"{m.group('key')}: \\033[8m{m.group('val')}\\033[28m"
|
||||
|
||||
output = _LEGACY_REDACTION_RE.sub(_replace, output)
|
||||
for key in sorted(unmarked):
|
||||
_LOGGER.warning(
|
||||
"Field '%s' is being redacted by a legacy substring heuristic. "
|
||||
"Mark this field's schema validator with cv.sensitive(...) for "
|
||||
"deterministic redaction; the heuristic will be removed in %s.",
|
||||
key,
|
||||
_LEGACY_REDACTION_REMOVAL,
|
||||
)
|
||||
return output
|
||||
|
||||
|
||||
def command_config_hash(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||
# generating code might modify config, so it must be done in order to generate
|
||||
# a hash that will match what was generated when compiling and then running
|
||||
|
||||
@@ -271,7 +271,7 @@ EAP_AUTH_SCHEMA = cv.All(
|
||||
WIFI_NETWORK_BASE = cv.Schema(
|
||||
{
|
||||
cv.GenerateID(): cv.declare_id(WiFiAP),
|
||||
cv.Optional(CONF_SSID): cv.ssid,
|
||||
cv.Optional(CONF_SSID): cv.sensitive(cv.ssid),
|
||||
cv.Optional(CONF_PASSWORD): cv.sensitive(validate_password),
|
||||
cv.Optional(CONF_CHANNEL): validate_channel,
|
||||
cv.Optional(CONF_MANUAL_IP): STA_MANUAL_IP_SCHEMA,
|
||||
@@ -434,7 +434,7 @@ CONFIG_SCHEMA = cv.All(
|
||||
cv.Optional(CONF_NETWORKS): cv.All(
|
||||
cv.ensure_list(WIFI_NETWORK_STA), cv.Length(max=MAX_WIFI_NETWORKS)
|
||||
),
|
||||
cv.Optional(CONF_SSID): cv.ssid,
|
||||
cv.Optional(CONF_SSID): cv.sensitive(cv.ssid),
|
||||
cv.Optional(CONF_PASSWORD): cv.sensitive(validate_password),
|
||||
cv.Optional(CONF_MANUAL_IP): STA_MANUAL_IP_SCHEMA,
|
||||
cv.Optional(CONF_EAP): EAP_AUTH_SCHEMA,
|
||||
@@ -850,7 +850,7 @@ async def final_step():
|
||||
WiFiConfigureAction,
|
||||
cv.Schema(
|
||||
{
|
||||
cv.Required(CONF_SSID): cv.templatable(cv.ssid),
|
||||
cv.Required(CONF_SSID): cv.sensitive(cv.templatable(cv.ssid)),
|
||||
cv.Required(CONF_PASSWORD): cv.sensitive(cv.templatable(validate_password)),
|
||||
cv.Optional(CONF_SAVE, default=True): cv.templatable(cv.boolean),
|
||||
cv.Optional(CONF_TIMEOUT, default="30000ms"): cv.templatable(
|
||||
|
||||
@@ -101,7 +101,7 @@ from esphome.schema_extractors import (
|
||||
)
|
||||
from esphome.util import parse_esphome_version
|
||||
from esphome.voluptuous_schema import _Schema
|
||||
from esphome.yaml_util import make_data_base
|
||||
from esphome.yaml_util import SensitiveStr, make_data_base
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -514,7 +514,13 @@ class SensitiveValidator:
|
||||
self.inner = inner
|
||||
|
||||
def __call__(self, value: typing.Any) -> typing.Any:
|
||||
return self.inner(value)
|
||||
validated = self.inner(value)
|
||||
# Tag string results so yaml_util.dump can mask them. Non-string
|
||||
# results pass through unchanged; already-tagged values are not
|
||||
# re-wrapped to keep nested cv.sensitive applications idempotent.
|
||||
if isinstance(validated, str) and not isinstance(validated, SensitiveStr):
|
||||
return SensitiveStr(validated)
|
||||
return validated
|
||||
|
||||
def __repr__(self) -> str:
|
||||
# Mirror the inner validator's repr so ``build_language_schema``'s
|
||||
|
||||
@@ -52,6 +52,16 @@ _load_listeners: list[Callable[[Path], None]] = []
|
||||
DocumentPath = list[str | int]
|
||||
|
||||
|
||||
class SensitiveStr(str):
|
||||
"""Marker subclass for validated strings that should be masked in
|
||||
user-visible YAML output. ``cv.sensitive`` wraps validated values in this
|
||||
type so ``dump()`` can render them with ANSI conceal codes without
|
||||
needing a post-process regex.
|
||||
"""
|
||||
|
||||
__slots__ = ()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def track_yaml_loads() -> Generator[list[Path]]:
|
||||
"""Context manager that records every file loaded by the YAML loader.
|
||||
@@ -808,11 +818,18 @@ def dump(dict_, show_secrets=False, sort_keys=False):
|
||||
if show_secrets:
|
||||
_SECRET_VALUES.clear()
|
||||
_SECRET_CACHE.clear()
|
||||
|
||||
# Per-call subclass so the redaction flag doesn't leak across calls.
|
||||
# (``_SECRET_VALUES`` / ``_SECRET_CACHE`` remain module globals; YAML
|
||||
# processing is single-threaded today, so this isolates only the flag.)
|
||||
class _Dumper(ESPHomeDumper):
|
||||
_redact_sensitive = not show_secrets
|
||||
|
||||
return yaml.dump(
|
||||
dict_,
|
||||
default_flow_style=False,
|
||||
allow_unicode=True,
|
||||
Dumper=ESPHomeDumper,
|
||||
Dumper=_Dumper,
|
||||
sort_keys=sort_keys,
|
||||
)
|
||||
|
||||
@@ -958,6 +975,10 @@ def format_path(path: DocumentPath, current_obj: Any) -> str:
|
||||
|
||||
|
||||
class ESPHomeDumper(yaml.SafeDumper):
|
||||
# Default for the base class; per-call subclass in ``dump()`` overrides.
|
||||
# When True, ``represent_sensitive`` wraps values in ANSI conceal codes.
|
||||
_redact_sensitive: bool = False
|
||||
|
||||
def represent_mapping(self, tag, mapping, flow_style=None):
|
||||
value = []
|
||||
node = yaml.MappingNode(tag, value, flow_style=flow_style)
|
||||
@@ -992,6 +1013,20 @@ class ESPHomeDumper(yaml.SafeDumper):
|
||||
return self.represent_secret(value)
|
||||
return self.represent_scalar(tag="tag:yaml.org,2002:str", value=str(value))
|
||||
|
||||
def represent_sensitive(self, value: SensitiveStr) -> yaml.ScalarNode:
|
||||
# Only the redact-and-not-a-secret branch is unique to sensitive
|
||||
# values; otherwise let ``represent_stringify`` handle ``!secret``
|
||||
# precedence and the plain-str fallthrough. Conceal sequence is
|
||||
# emitted as literal ``\033`` text (not actual ESC bytes) so the
|
||||
# output matches the prior regex format and device-builder's
|
||||
# ``\033[8m...\033[28m`` parser keeps working.
|
||||
if self._redact_sensitive and not is_secret(value):
|
||||
return self.represent_scalar(
|
||||
tag="tag:yaml.org,2002:str",
|
||||
value=f"\\033[8m{value}\\033[28m",
|
||||
)
|
||||
return self.represent_stringify(value)
|
||||
|
||||
# pylint: disable=arguments-renamed
|
||||
def represent_bool(self, value):
|
||||
return self.represent_scalar(
|
||||
@@ -1063,6 +1098,8 @@ ESPHomeDumper.add_multi_representer(
|
||||
)
|
||||
ESPHomeDumper.add_multi_representer(bool, ESPHomeDumper.represent_bool)
|
||||
ESPHomeDumper.add_multi_representer(str, ESPHomeDumper.represent_stringify)
|
||||
# MRO-walked dispatch; SensitiveStr's own entry wins over the str one.
|
||||
ESPHomeDumper.add_multi_representer(SensitiveStr, ESPHomeDumper.represent_sensitive)
|
||||
ESPHomeDumper.add_multi_representer(int, ESPHomeDumper.represent_int)
|
||||
ESPHomeDumper.add_multi_representer(float, ESPHomeDumper.represent_float)
|
||||
ESPHomeDumper.add_multi_representer(_BaseAddress, ESPHomeDumper.represent_stringify)
|
||||
|
||||
@@ -27,6 +27,7 @@ from esphome.const import (
|
||||
SCHEDULER_DONT_RUN,
|
||||
)
|
||||
from esphome.core import CORE, HexInt, Lambda
|
||||
from esphome.yaml_util import SensitiveStr
|
||||
|
||||
|
||||
def test_check_not_templatable__invalid():
|
||||
@@ -145,6 +146,42 @@ def test_sensitive__custom_inner_delegates_validation() -> None:
|
||||
validator(123)
|
||||
|
||||
|
||||
def test_sensitive__wraps_string_result_in_sensitive_str() -> None:
|
||||
validator = config_validation.sensitive()
|
||||
result = validator("hunter2")
|
||||
|
||||
assert isinstance(result, SensitiveStr)
|
||||
assert isinstance(result, str)
|
||||
assert result == "hunter2"
|
||||
|
||||
|
||||
def test_sensitive__does_not_double_tag_already_sensitive() -> None:
|
||||
# If the inner validator already returns a SensitiveStr (e.g., nested
|
||||
# cv.sensitive wrappers), re-tagging is a no-op rather than a new
|
||||
# SensitiveStr around the same value.
|
||||
pre_tagged = SensitiveStr("hunter2")
|
||||
|
||||
def inner(_value):
|
||||
return pre_tagged
|
||||
|
||||
validator = config_validation.sensitive(inner)
|
||||
result = validator("anything")
|
||||
|
||||
assert result is pre_tagged
|
||||
|
||||
|
||||
def test_sensitive__non_string_result_passes_through() -> None:
|
||||
# If an inner validator returns something other than a string (e.g., a
|
||||
# Lambda template), the sensitive wrapper must not coerce it.
|
||||
sentinel = object()
|
||||
|
||||
def inner(_value):
|
||||
return sentinel
|
||||
|
||||
validator = config_validation.sensitive(inner)
|
||||
assert validator("anything") is sentinel
|
||||
|
||||
|
||||
def test_sensitive__is_detectable_via_isinstance() -> None:
|
||||
validator = config_validation.sensitive()
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ from esphome.__main__ import (
|
||||
Purpose,
|
||||
_get_configured_xtal_freq,
|
||||
_make_crystal_freq_callback,
|
||||
_redact_with_legacy_fallback,
|
||||
_resolve_network_devices,
|
||||
_validate_bootloader_binary,
|
||||
_validate_partition_table_binary,
|
||||
@@ -29,6 +30,7 @@ from esphome.__main__ import (
|
||||
command_analyze_memory,
|
||||
command_bundle,
|
||||
command_clean_all,
|
||||
command_config,
|
||||
command_config_hash,
|
||||
command_rename,
|
||||
command_run,
|
||||
@@ -340,6 +342,135 @@ def mock_ram_strings_analyzer() -> Generator[Mock]:
|
||||
yield mock_class
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__wraps_unmarked_field(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Unmarked sensitive-shaped fields are redacted; a deprecation warning
|
||||
is emitted naming the field."""
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
out = _redact_with_legacy_fallback("password: hunter2\n")
|
||||
assert "password: \\033[8mhunter2\\033[28m" in out
|
||||
assert any(
|
||||
"password" in rec.message and "cv.sensitive" in rec.message
|
||||
for rec in caplog.records
|
||||
)
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__skips_already_wrapped(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Values already wrapped by the SensitiveStr representer don't trigger
|
||||
the heuristic or the warning."""
|
||||
wrapped = "password: \\033[8mhunter2\\033[28m\n"
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
out = _redact_with_legacy_fallback(wrapped)
|
||||
assert out == wrapped
|
||||
assert not any("legacy substring" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__captures_full_field_name(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""The warning names the actual field, not just the matched fragment."""
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
_redact_with_legacy_fallback("encryption_key: abc\n")
|
||||
assert any("encryption_key" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__deduplicates_warnings(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""One warning per unique field name even if it appears many times."""
|
||||
text = "password: a\npassword: b\npassword: c\n"
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
_redact_with_legacy_fallback(text)
|
||||
password_warnings = [rec for rec in caplog.records if "'password'" in rec.message]
|
||||
assert len(password_warnings) == 1
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__skips_lambda_values(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""``!lambda`` first line is structural, body is unreachable by a
|
||||
single-line regex anyway, and tagged fields shouldn't trigger a warning."""
|
||||
text = ' ssid: !lambda |-\n return "x";\n'
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
out = _redact_with_legacy_fallback(text)
|
||||
assert out == text
|
||||
assert not any("legacy substring" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__skips_secret_references(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""``!secret name`` is the dumper's user-friendly representation; the
|
||||
name isn't the secret, so wrapping it would clobber the round-trip."""
|
||||
text = " password: !secret wifi_password\n"
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
out = _redact_with_legacy_fallback(text)
|
||||
assert out == text
|
||||
assert not any("legacy substring" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__does_not_match_fragment_in_middle(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Fragment must end the field name; embedded matches like
|
||||
``key_value_pair`` are unrelated to a sensitive key and must not be
|
||||
redacted (matching the prior regex's scope)."""
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
out = _redact_with_legacy_fallback("key_value_pair: abc\n")
|
||||
assert "\\033[8m" not in out
|
||||
assert not any("legacy substring" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_redact_with_legacy_fallback__does_not_match_fragment_as_suffix(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Fragment must start the name or follow ``_``; ``monkey:`` shouldn't
|
||||
fire a 'legacy heuristic' warning because there's no sensitive field
|
||||
here — the user has nothing to migrate."""
|
||||
with caplog.at_level(logging.WARNING, logger="esphome.__main__"):
|
||||
out = _redact_with_legacy_fallback("monkey: 1234\n")
|
||||
assert "\\033[8m" not in out
|
||||
assert not any("legacy substring" in rec.message for rec in caplog.records)
|
||||
|
||||
|
||||
def test_command_config__invokes_legacy_fallback_when_redacting(
|
||||
tmp_path: Path, capfd: CaptureFixture[str]
|
||||
) -> None:
|
||||
"""``command_config`` runs the legacy fallback on the dumped output when
|
||||
``--show-secrets`` is off. Cover the wiring (not just the helper).
|
||||
"""
|
||||
setup_core(tmp_path=tmp_path, config={"esphome": {"name": "test"}})
|
||||
args = MockArgs()
|
||||
args.show_secrets = False
|
||||
|
||||
result = command_config(args, {"wifi": {"password": "hunter2"}})
|
||||
|
||||
assert result == 0
|
||||
output = capfd.readouterr().out
|
||||
assert "\\033[8mhunter2\\033[28m" in output
|
||||
|
||||
|
||||
def test_command_config__show_secrets_skips_redaction(
|
||||
tmp_path: Path, capfd: CaptureFixture[str]
|
||||
) -> None:
|
||||
"""With ``--show-secrets`` the helper isn't invoked and the value
|
||||
renders raw.
|
||||
"""
|
||||
setup_core(tmp_path=tmp_path, config={"esphome": {"name": "test"}})
|
||||
args = MockArgs()
|
||||
args.show_secrets = True
|
||||
|
||||
result = command_config(args, {"wifi": {"password": "hunter2"}})
|
||||
|
||||
assert result == 0
|
||||
output = capfd.readouterr().out
|
||||
assert "hunter2" in output
|
||||
assert "\\033[8m" not in output
|
||||
|
||||
|
||||
def test_choose_upload_log_host_with_string_default() -> None:
|
||||
"""Test with a single string default device."""
|
||||
setup_core()
|
||||
|
||||
@@ -15,6 +15,7 @@ from esphome.yaml_util import (
|
||||
DiscoveredYamlFiles,
|
||||
ESPHomeDataBase,
|
||||
ESPLiteralValue,
|
||||
SensitiveStr,
|
||||
discover_user_yaml_files,
|
||||
force_load_include_files,
|
||||
format_path,
|
||||
@@ -1340,3 +1341,57 @@ def test_frontmatter_included_file_stored(tmp_path: Path) -> None:
|
||||
assert main.resolve() not in core.CORE.frontmatter
|
||||
# Included file's frontmatter is captured
|
||||
assert core.CORE.frontmatter[inc.resolve()]["child_meta"] == "hello"
|
||||
|
||||
|
||||
def test_sensitive_str__is_a_str_subclass() -> None:
|
||||
value = SensitiveStr("hunter2")
|
||||
assert isinstance(value, str)
|
||||
assert value == "hunter2"
|
||||
|
||||
|
||||
def test_dump__redacts_sensitive_str_by_default() -> None:
|
||||
out = yaml_util.dump({"password": SensitiveStr("hunter2")})
|
||||
assert "\\033[8mhunter2\\033[28m" in out
|
||||
assert "hunter2" not in out.replace(
|
||||
"\\033[8mhunter2\\033[28m", ""
|
||||
) # the raw value is only present inside the wrap
|
||||
|
||||
|
||||
def test_dump__show_secrets_emits_sensitive_str_raw() -> None:
|
||||
out = yaml_util.dump({"password": SensitiveStr("hunter2")}, show_secrets=True)
|
||||
assert "hunter2" in out
|
||||
assert "\\033[8m" not in out
|
||||
assert "\\033[28m" not in out
|
||||
|
||||
|
||||
def test_dump__plain_str_is_not_redacted() -> None:
|
||||
out = yaml_util.dump({"hostname": "myserver"})
|
||||
assert "myserver" in out
|
||||
assert "\\033[8m" not in out
|
||||
|
||||
|
||||
def test_dump__secret_reference_wins_over_redaction() -> None:
|
||||
# If the value also has an entry in _SECRET_VALUES (i.e., it was loaded
|
||||
# via !secret), the dump should render it as !secret <name>, not as a
|
||||
# redacted scalar. SensitiveStr layered on top must not change that.
|
||||
value = SensitiveStr("hunter2")
|
||||
yaml_util._SECRET_VALUES[str(value)] = "my_secret_name"
|
||||
try:
|
||||
out = yaml_util.dump({"password": value})
|
||||
assert "!secret" in out
|
||||
assert "my_secret_name" in out
|
||||
assert "\\033[8m" not in out
|
||||
finally:
|
||||
yaml_util._SECRET_VALUES.clear()
|
||||
|
||||
|
||||
def test_dump__redaction_flag_does_not_leak_between_calls() -> None:
|
||||
# Per-call _Dumper subclass means show_secrets in one call doesn't
|
||||
# affect another. Run them in both orders to catch any leakage.
|
||||
redacted = yaml_util.dump({"password": SensitiveStr("hunter2")})
|
||||
raw = yaml_util.dump({"password": SensitiveStr("hunter2")}, show_secrets=True)
|
||||
redacted_again = yaml_util.dump({"password": SensitiveStr("hunter2")})
|
||||
|
||||
assert "\\033[8m" in redacted
|
||||
assert "\\033[8m" not in raw
|
||||
assert "\\033[8m" in redacted_again
|
||||
|
||||
Reference in New Issue
Block a user