mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 13:45:15 +00:00
[core] Add cv.sensitive marker for schema-level sensitive fields (#16673)
This commit is contained in:
@@ -487,6 +487,53 @@ def string_strict(value):
|
||||
)
|
||||
|
||||
|
||||
# Substring fallbacks for fields whose validator isn't explicitly wrapped in
|
||||
# ``cv.sensitive``. Frontends and dump tooling should prefer the explicit
|
||||
# marker; this list exists so we still mask obvious leaks in unmigrated or
|
||||
# third-party schemas. Kept here as the single source of truth.
|
||||
SENSITIVE_KEY_FRAGMENTS: frozenset[str] = frozenset(
|
||||
{
|
||||
"password",
|
||||
"passcode",
|
||||
"secret",
|
||||
"token",
|
||||
"api_key",
|
||||
"apikey",
|
||||
"psk",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class SensitiveValidator:
|
||||
"""Marker wrapper that flags a field as containing sensitive data (passwords,
|
||||
encryption keys, PSKs, tokens). Frontends and dump tooling detect this marker
|
||||
to mask the value; validation behavior is delegated to the inner validator.
|
||||
"""
|
||||
|
||||
def __init__(self, inner: Callable[[typing.Any], typing.Any]) -> None:
|
||||
self.inner = inner
|
||||
|
||||
def __call__(self, value: typing.Any) -> typing.Any:
|
||||
return self.inner(value)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
# Mirror the inner validator's repr so ``build_language_schema``'s
|
||||
# ``known_schemas``/``extended_schemas`` dedup (keyed on ``repr(schema)``)
|
||||
# treats two wrappers around the same inner as identical, and so
|
||||
# voluptuous error messages stay readable.
|
||||
return repr(self.inner)
|
||||
|
||||
|
||||
def sensitive(
|
||||
inner: Callable[[typing.Any], typing.Any] = string,
|
||||
) -> SensitiveValidator:
|
||||
"""Mark a field as sensitive so that frontends mask it and dump tooling redacts it.
|
||||
|
||||
Validation behavior is identical to ``inner`` (defaults to ``cv.string``).
|
||||
"""
|
||||
return SensitiveValidator(inner)
|
||||
|
||||
|
||||
def icon(value):
|
||||
"""Validate that a given config value is a valid icon."""
|
||||
from esphome.core.config import ICON_MAX_LENGTH
|
||||
|
||||
@@ -39,7 +39,11 @@ parser.add_argument(
|
||||
)
|
||||
parser.add_argument("--check", action="store_true", help="Check only for CI")
|
||||
|
||||
args = parser.parse_args()
|
||||
# Module-level ``Namespace`` so helper functions can reference ``args``
|
||||
# without threading it through every call. ``main()`` fills it via
|
||||
# ``parser.parse_args(namespace=args)``; tests import this module without
|
||||
# invoking ``main()`` and rely on the defaults below.
|
||||
args = argparse.Namespace(output_path=".", check=False)
|
||||
|
||||
DUMP_RAW = False
|
||||
DUMP_UNKNOWN = False
|
||||
@@ -850,6 +854,12 @@ def convert(schema, config_var, path):
|
||||
convert(ext, config_var, f"{path}/ext{idx}")
|
||||
return
|
||||
|
||||
if isinstance(schema, cv.SensitiveValidator):
|
||||
config_var["sensitive"] = True
|
||||
config_var["sensitive_source"] = "explicit"
|
||||
convert(schema.inner, config_var, f"{path}/sensitive")
|
||||
return
|
||||
|
||||
if isinstance(schema, cv.All):
|
||||
i = 0
|
||||
for inner in schema.validators:
|
||||
@@ -1125,6 +1135,25 @@ def convert_keys(converted, schema, path):
|
||||
|
||||
# Do value
|
||||
convert(v, result, path + f"/{str(k)}")
|
||||
|
||||
# Heuristic fallback when the field's validator wasn't explicitly
|
||||
# wrapped in ``cv.sensitive``. Only applies to string-typed leaves so
|
||||
# we don't mark unrelated nested schemas. ``sensitive_source`` lets
|
||||
# consumers distinguish explicit markers from heuristic matches. Pull
|
||||
# the field name from ``k.schema`` (voluptuous's stored key) rather
|
||||
# than ``str(k)`` so we don't depend on the marker's ``__str__``
|
||||
# representation.
|
||||
if (
|
||||
"sensitive" not in result
|
||||
and result.get(S_TYPE) == "string"
|
||||
and isinstance(k, (cv.Required, cv.Optional, cv.Inclusive, cv.Exclusive))
|
||||
and isinstance(k.schema, str)
|
||||
):
|
||||
key_lower = k.schema.lower()
|
||||
if any(frag in key_lower for frag in cv.SENSITIVE_KEY_FRAGMENTS):
|
||||
result["sensitive"] = True
|
||||
result["sensitive_source"] = "heuristic"
|
||||
|
||||
if "schema" not in converted:
|
||||
converted[S_TYPE] = "schema"
|
||||
converted["schema"] = {S_CONFIG_VARS: {}}
|
||||
@@ -1142,4 +1171,10 @@ def convert_keys(converted, schema, path):
|
||||
config_vars["string"] = config_vars.pop(key)
|
||||
|
||||
|
||||
build_schema()
|
||||
def main() -> None:
|
||||
parser.parse_args(namespace=args)
|
||||
build_schema()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -3,8 +3,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
from esphome import config_validation as cv
|
||||
|
||||
SCRIPT_PATH = (
|
||||
Path(__file__).resolve().parent.parent.parent
|
||||
/ "script"
|
||||
@@ -12,10 +15,16 @@ SCRIPT_PATH = (
|
||||
)
|
||||
|
||||
|
||||
def _load_script_module():
|
||||
spec = importlib.util.spec_from_file_location("build_language_schema", SCRIPT_PATH)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _extract_sort_obj():
|
||||
# build_language_schema.py runs argparse, loads every component, and
|
||||
# calls build_schema() at import time, so a plain import isn't viable
|
||||
# in a unit test. Pull just the pure helper out via AST instead.
|
||||
# ``sort_obj`` is pure and self-contained; pulling it via AST avoids
|
||||
# exercising the module-level component-loading state for these tests.
|
||||
tree = ast.parse(SCRIPT_PATH.read_text())
|
||||
for node in tree.body:
|
||||
if isinstance(node, ast.FunctionDef) and node.name == "sort_obj":
|
||||
@@ -27,6 +36,7 @@ def _extract_sort_obj():
|
||||
|
||||
|
||||
sort_obj = _extract_sort_obj()
|
||||
_bls = _load_script_module()
|
||||
|
||||
|
||||
def test_sort_obj_sorts_dict_keys() -> None:
|
||||
@@ -96,3 +106,56 @@ def test_sort_obj_passes_through_scalars() -> None:
|
||||
assert sort_obj(42) == 42
|
||||
assert sort_obj(None) is None
|
||||
assert sort_obj(True) is True
|
||||
|
||||
|
||||
def test_convert_emits_explicit_sensitive_marker() -> None:
|
||||
config_var: dict = {}
|
||||
_bls.convert(cv.sensitive(cv.string), config_var, "/test")
|
||||
|
||||
assert config_var["sensitive"] is True
|
||||
assert config_var["sensitive_source"] == "explicit"
|
||||
assert config_var["type"] == "string"
|
||||
|
||||
|
||||
def test_convert_keys_emits_heuristic_sensitive_marker() -> None:
|
||||
converted: dict = {}
|
||||
_bls.convert_keys(converted, {cv.Optional("password"): cv.string}, "/root")
|
||||
|
||||
entry = converted["schema"]["config_vars"]["password"]
|
||||
assert entry["sensitive"] is True
|
||||
assert entry["sensitive_source"] == "heuristic"
|
||||
assert entry["type"] == "string"
|
||||
|
||||
|
||||
def test_convert_keys_explicit_beats_heuristic() -> None:
|
||||
# Key name matches a fragment but the validator is explicitly wrapped;
|
||||
# the explicit branch should win and emit ``sensitive_source: explicit``.
|
||||
converted: dict = {}
|
||||
_bls.convert_keys(
|
||||
converted, {cv.Optional("password"): cv.sensitive(cv.string)}, "/root"
|
||||
)
|
||||
|
||||
entry = converted["schema"]["config_vars"]["password"]
|
||||
assert entry["sensitive"] is True
|
||||
assert entry["sensitive_source"] == "explicit"
|
||||
|
||||
|
||||
def test_convert_keys_no_heuristic_for_non_string_leaves() -> None:
|
||||
# Even though the key contains a fragment, a non-string leaf must not
|
||||
# be flagged. Prevents false positives on unrelated fields whose name
|
||||
# happens to embed a substring like "token".
|
||||
converted: dict = {}
|
||||
_bls.convert_keys(converted, {cv.Optional("password"): cv.boolean}, "/root")
|
||||
|
||||
entry = converted["schema"]["config_vars"]["password"]
|
||||
assert "sensitive" not in entry
|
||||
assert "sensitive_source" not in entry
|
||||
|
||||
|
||||
def test_convert_keys_no_marker_for_non_sensitive_field() -> None:
|
||||
converted: dict = {}
|
||||
_bls.convert_keys(converted, {cv.Optional("hostname"): cv.string}, "/root")
|
||||
|
||||
entry = converted["schema"]["config_vars"]["hostname"]
|
||||
assert "sensitive" not in entry
|
||||
assert "sensitive_source" not in entry
|
||||
|
||||
@@ -127,6 +127,49 @@ def test_string_string__invalid(value):
|
||||
config_validation.string_strict(value)
|
||||
|
||||
|
||||
def test_sensitive__default_delegates_to_string() -> None:
|
||||
validator = config_validation.sensitive()
|
||||
|
||||
assert isinstance(validator, config_validation.SensitiveValidator)
|
||||
assert validator.inner is config_validation.string
|
||||
assert validator("hunter2") == "hunter2"
|
||||
assert validator(42) == "42"
|
||||
|
||||
|
||||
def test_sensitive__custom_inner_delegates_validation() -> None:
|
||||
validator = config_validation.sensitive(config_validation.string_strict)
|
||||
|
||||
assert validator.inner is config_validation.string_strict
|
||||
assert validator("abc") == "abc"
|
||||
with pytest.raises(Invalid, match="Must be string, got"):
|
||||
validator(123)
|
||||
|
||||
|
||||
def test_sensitive__is_detectable_via_isinstance() -> None:
|
||||
validator = config_validation.sensitive()
|
||||
|
||||
assert isinstance(validator, config_validation.SensitiveValidator)
|
||||
|
||||
|
||||
def test_sensitive__repr_mirrors_inner() -> None:
|
||||
# The schema dump dedups on ``repr(schema)``; mirroring the inner
|
||||
# validator's repr keeps two ``cv.sensitive(cv.string)`` wrappers
|
||||
# interchangeable for that purpose and avoids leaking the wrapper as
|
||||
# noise in voluptuous error messages.
|
||||
assert repr(config_validation.sensitive(config_validation.string)) == repr(
|
||||
config_validation.string
|
||||
)
|
||||
assert repr(config_validation.sensitive(config_validation.string)) == repr(
|
||||
config_validation.sensitive(config_validation.string)
|
||||
)
|
||||
|
||||
|
||||
def test_sensitive_key_fragments__covers_common_terms() -> None:
|
||||
assert isinstance(config_validation.SENSITIVE_KEY_FRAGMENTS, frozenset)
|
||||
for term in ("password", "passcode", "secret", "token", "api_key", "apikey", "psk"):
|
||||
assert term in config_validation.SENSITIVE_KEY_FRAGMENTS
|
||||
|
||||
|
||||
@given(
|
||||
builds(
|
||||
lambda v: "mdi:" + v,
|
||||
|
||||
Reference in New Issue
Block a user