From 9cebce1b6ecefcdb6d2c8fa3b6b854f08c24484e Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Wed, 22 Apr 2026 03:19:01 +0200 Subject: [PATCH] [substitutions] Improve error messages with include stack trace (#15874) Co-authored-by: J. Nick Koston --- esphome/components/packages/__init__.py | 64 +++++-- esphome/components/substitutions/__init__.py | 83 +++----- esphome/yaml_util.py | 119 ++++++++++++ .../component_tests/packages/test_packages.py | 24 ++- tests/unit_tests/test_substitutions.py | 47 ++++- tests/unit_tests/test_yaml_util.py | 181 +++++++++++++++++- 6 files changed, 432 insertions(+), 86 deletions(-) diff --git a/esphome/components/packages/__init__.py b/esphome/components/packages/__init__.py index 97a5309480..b6ec0067c9 100644 --- a/esphome/components/packages/__init__.py +++ b/esphome/components/packages/__init__.py @@ -42,6 +42,11 @@ DOMAIN = CONF_PACKAGES # Guard against infinite include chains (e.g. A includes B includes A). MAX_INCLUDE_DEPTH = 20 +PackageCallback = Callable[ + [dict | str | yaml_util.IncludeFile, ContextVars | None, yaml_util.DocumentPath], + dict, +] + def is_remote_package(package_config: dict) -> bool: """Returns True if the package_config is a remote package definition.""" @@ -281,8 +286,9 @@ def _process_remote_package(config: dict, skip_update: bool = False) -> dict: def _walk_package_dict( packages: dict, - callback: Callable[[dict, ContextVars | None], dict], + callback: PackageCallback, context: ContextVars | None, + path: yaml_util.DocumentPath, ) -> cv.Invalid | None: """Iterate a packages dict in reverse priority order, invoking callback on each entry. @@ -291,7 +297,9 @@ def _walk_package_dict( for package_name, package_config in reversed(packages.items()): with cv.prepend_path(package_name): try: - packages[package_name] = callback(package_config, context) + packages[package_name] = callback( + package_config, context, path + [package_name] + ) except cv.Invalid as err: return err return None @@ -299,20 +307,22 @@ def _walk_package_dict( def _walk_package_list( packages: list, - callback: Callable[[dict, ContextVars | None], dict], + callback: PackageCallback, context: ContextVars | None, + path: yaml_util.DocumentPath, ) -> None: """Iterate a packages list in reverse priority order, invoking callback on each entry.""" for idx in reversed(range(len(packages))): with cv.prepend_path(idx): - packages[idx] = callback(packages[idx], context) + packages[idx] = callback(packages[idx], context, path + [idx]) def _walk_packages( config: dict, - callback: Callable[[dict, ContextVars | None], dict], + callback: PackageCallback, context: ContextVars | None = None, validate_deprecated: bool = True, + path: yaml_util.DocumentPath | None = None, ) -> dict: """Walks the packages structure in priority order, invoking ``callback`` on each package definition found. @@ -323,19 +333,24 @@ def _walk_packages( if CONF_PACKAGES not in config: return config packages = config[CONF_PACKAGES] + packages_path = (path or []) + [CONF_PACKAGES] with cv.prepend_path(CONF_PACKAGES): if isinstance(packages, yaml_util.IncludeFile): # If the packages key is an IncludeFile, resolve it first before processing. - packages, _ = resolve_include(packages, [], context, strict_undefined=False) + packages = resolve_include( + packages, packages_path, context, strict_undefined=False + ) if not isinstance(packages, (dict, list)): raise cv.Invalid( f"Packages must be a key to value mapping or list, got {type(packages)} instead" ) if not isinstance(packages, dict): - _walk_package_list(packages, callback, context) - elif (result := _walk_package_dict(packages, callback, context)) is not None: + _walk_package_list(packages, callback, context, packages_path) + elif ( + result := _walk_package_dict(packages, callback, context, packages_path) + ) is not None: if not validate_deprecated or any( is_package_definition(v) for v in packages.values() ): @@ -344,14 +359,18 @@ def _walk_packages( # This block can be removed once the single-package # deprecation period (2026.7.0) is over. config[CONF_PACKAGES] = [packages] - return _walk_packages(deprecate_single_package(config), callback, context) + return _walk_packages( + deprecate_single_package(config), callback, context, path=path + ) config[CONF_PACKAGES] = packages return config def _substitute_package_definition( - package_config: dict | str, context_vars: ContextVars | None + package_config: dict | str, + context_vars: ContextVars | None, + path: yaml_util.DocumentPath | None = None, ) -> dict | str: """Substitute variables in a package definition string or remote package dict. @@ -369,12 +388,12 @@ def _substitute_package_definition( errors: ErrList = [] package_config = substitute( item=package_config, - path=[], + path=path or [], parent_context=context_vars or ContextVars(), strict_undefined=False, errors=errors, ) - raise_first_undefined(errors, package_config, "package definition") + raise_first_undefined(errors, "package definition") return package_config @@ -432,6 +451,7 @@ class _PackageProcessor: self, package_config: dict | str | yaml_util.IncludeFile, context_vars: ContextVars | None, + path: yaml_util.DocumentPath, ) -> dict: """Resolve a package definition to a concrete ``dict`` and fetch remote packages. @@ -454,15 +474,15 @@ class _PackageProcessor: """ for _ in range(MAX_INCLUDE_DEPTH): if isinstance(package_config, yaml_util.IncludeFile): - package_config, _ = resolve_include( + package_config = resolve_include( package_config, - [], + path, context_vars or ContextVars(), strict_undefined=False, ) package_config = _substitute_package_definition( - package_config, context_vars + package_config, context_vars, path ) package_config = PACKAGE_SCHEMA(package_config) if isinstance(package_config, dict): @@ -483,13 +503,16 @@ class _PackageProcessor: _update_substitutions_context(self.parent_context, subs) def process_package( - self, package_config: dict | str, context_vars: ContextVars | None + self, + package_config: dict | str, + context_vars: ContextVars | None, + path: yaml_util.DocumentPath, ) -> dict: """Resolve a single package and recurse into any nested packages.""" from_remote = isinstance(package_config, dict) and is_remote_package( package_config ) - package_config = self.resolve_package(package_config, context_vars) + package_config = self.resolve_package(package_config, context_vars, path) self.collect_substitutions(package_config) if CONF_PACKAGES not in package_config: @@ -509,6 +532,7 @@ class _PackageProcessor: self.process_package, context_vars, validate_deprecated=not from_remote, + path=path, ) @@ -565,11 +589,13 @@ def merge_packages(config: dict) -> dict: merge_list: list[dict] = [] def process_package_callback( - package_config: dict, context: ContextVars | None + package_config: dict, + context: ContextVars | None, + path: yaml_util.DocumentPath | None = None, ) -> dict: """This will be called for each package found in the config.""" merge_list.append(package_config) - return _walk_packages(package_config, process_package_callback) + return _walk_packages(package_config, process_package_callback, path=path) _walk_packages(config, process_package_callback, validate_deprecated=False) # Merge all packages into the main config: diff --git a/esphome/components/substitutions/__init__.py b/esphome/components/substitutions/__init__.py index 8bbccffca1..fb7cd7c51b 100644 --- a/esphome/components/substitutions/__init__.py +++ b/esphome/components/substitutions/__init__.py @@ -11,9 +11,11 @@ from esphome.types import ConfigType from esphome.util import OrderedDict from esphome.yaml_util import ( ConfigContext, + DocumentPath, ESPHomeDataBase, ESPLiteralValue, IncludeFile, + format_path, make_data_base, ) @@ -23,8 +25,8 @@ CODEOWNERS = ["@esphome/core"] _LOGGER = logging.getLogger(__name__) ContextVars = ChainMap[str, Any] -SubstitutionPath = list[int | str] -ErrList = list[tuple[UndefinedError, SubstitutionPath, Any]] +ErrList = list[tuple[UndefinedError, DocumentPath, Any]] + # Module-level instance is safe: context_vars is passed per-call, and context_trace # is stack-saved/restored within expand(). Not thread-safe — only use from one thread. jinja = Jinja() @@ -32,16 +34,13 @@ jinja = Jinja() def raise_first_undefined( errors: ErrList, - source: Any, context_label: str, ) -> None: """If *errors* is non-empty, raise ``cv.Invalid`` for the first undefined variable. - The raised error names the missing variable, the path walked into *source* - (for nested dicts, e.g. ``url`` or ``ref``), and the YAML source location - when *source* carries one. Only the first error is surfaced; the user will - re-run after fixing it and any remaining undefined variables will be - reported then. + The raised error names the missing variable and its location in the include + stack. Only the first error is surfaced; the user will re-run after fixing it + and any remaining undefined variables will be reported then. ``context_label`` is the noun describing where the undefined variable appeared (e.g. ``"package definition"``). @@ -57,26 +56,8 @@ def raise_first_undefined( for e, p_path, _ in errors[1:] ) _LOGGER.debug("Additional undefined variables in %s: %s", context_label, extras) - # Prefer the location of the offending scalar (e.g. the `url:` value) over - # the enclosing package-definition dict so the message points at the exact - # line/column that carries the undefined variable. - location_node = ( - err_value - if isinstance(err_value, ESPHomeDataBase) and err_value.esp_range is not None - else source - ) - location = "" - if ( - isinstance(location_node, ESPHomeDataBase) - and location_node.esp_range is not None - ): - mark = location_node.esp_range.start_mark - # DocumentLocation.line/column are 0-based (from the YAML Mark). Render - # as 1-based to match config.line_info() and editor line numbering. - location = f" (in {mark.document} {mark.line + 1}:{mark.column + 1})" - field = f" at '{'->'.join(str(p) for p in err_path)}'" if err_path else "" raise cv.Invalid( - f"Undefined variable in {context_label}{field}: {err.message}{location}" + f"Undefined variable in {context_label}: {err.message}\n{format_path(err_path, err_value)}" ) @@ -145,7 +126,7 @@ def _resolve_var(name: str, context_vars: ContextVars) -> Any: def _handle_undefined( err: UndefinedError, - path: SubstitutionPath, + path: DocumentPath, value: Any, strict_undefined: bool, errors: ErrList | None, @@ -163,7 +144,7 @@ def _handle_undefined( def _expand_substitutions( value: str, - path: SubstitutionPath, + path: DocumentPath, context_vars: ContextVars, strict_undefined: bool, errors: ErrList | None, @@ -236,7 +217,7 @@ def _expand_substitutions( f"\nEvaluation stack: (most recent evaluation last)" f"\n{err.stack_trace_str()}" f"\nRelevant context:\n{err.context_trace_str()}" - f"\nSee {'->'.join(str(x) for x in path)}", + f"\n{format_path(path, orig_value)}", path, ) from err else: @@ -345,15 +326,13 @@ def push_context( def resolve_include( include: IncludeFile, - path: list[int | str], + path: DocumentPath, context_vars: ContextVars, strict_undefined: bool = True, errors: ErrList | None = None, -) -> tuple[Any, str]: +) -> Any: """Resolve an include, substituting the filename if needed. - Returns the loaded content and the resolved filename. - Note: no path-traversal validation is performed on the resolved filename. A substitution that resolves to an absolute path will bypass the parent directory (Path.__truediv__ ignores the left operand for absolute paths). @@ -361,44 +340,44 @@ def resolve_include( values (including command-line substitutions), so path restrictions are an explicit non-goal here. """ - original = str(include.file) + original = include.file + original_str = str(original) filename = str( _expand_substitutions( - original, path + ["file"], context_vars, strict_undefined, errors + original_str, path + ["file"], context_vars, strict_undefined, errors ) ) - if filename != original: + substituted = filename != original_str + if substituted: include = IncludeFile( include.parent_file, filename, include.vars, include.yaml_loader ) try: - return include.load(), filename + return include.load() except esphome.core.EsphomeError as err: + resolved = f" (expanded from '{original}')" if substituted else "" raise cv.Invalid( - f"Error including file '{filename}': {err}", + f"Error including file '{filename}'{resolved}: {err}" + f"\n{format_path(path, original)}", path + [f"<{filename}>"], ) from err def _substitute_include( include: IncludeFile, - path: list[int | str], + path: DocumentPath, context_vars: ContextVars, strict_undefined: bool, errors: ErrList | None, ) -> Any: """Resolve an include and substitute its content.""" - content, filename = resolve_include( - include, path, context_vars, strict_undefined, errors - ) - return substitute( - content, path + [f"<{filename}>"], context_vars, strict_undefined, errors - ) + content = resolve_include(include, path, context_vars, strict_undefined, errors) + return substitute(content, path, context_vars, strict_undefined, errors) def substitute( item: Any, - path: SubstitutionPath, + path: DocumentPath, parent_context: ContextVars, strict_undefined: bool, errors: ErrList | None = None, @@ -451,16 +430,12 @@ def _warn_unresolved_variables(errors: ErrList) -> None: for err, path, expression in errors: if "password" in path: continue - location: str = "->".join(str(x) for x in path) - if isinstance(expression, ESPHomeDataBase) and expression.esp_range is not None: - location += f" in {str(expression.esp_range.start_mark)}" - _LOGGER.warning( "The string '%s' looks like an expression," - " but could not resolve all the variables: %s (see %s)", + " but could not resolve all the variables: %s\n%s", expression, err.message, - location, + format_path(path, expression), ) @@ -479,7 +454,7 @@ def resolve_substitutions_block( # Single-shot resolution — matches ``_walk_packages`` for the # ``packages: !include`` entry point. Chained includes (an include that # itself loads another ``!include`` at the top level) are not supported. - substitutions, _ = resolve_include( + substitutions = resolve_include( substitutions, [], ContextVars(command_line_substitutions or {}), diff --git a/esphome/yaml_util.py b/esphome/yaml_util.py index e15adff935..42da27ec14 100644 --- a/esphome/yaml_util.py +++ b/esphome/yaml_util.py @@ -48,6 +48,8 @@ _SECRET_VALUES = {} # Not thread-safe — config processing is single-threaded today. _load_listeners: list[Callable[[Path], None]] = [] +DocumentPath = list[str | int] + @contextmanager def track_yaml_loads() -> Generator[list[Path]]: @@ -679,6 +681,123 @@ def is_secret(value): return None +def _path_doc(item: Any) -> str | None: + """Return the source document name if *item* carries location info.""" + if isinstance(item, ESPHomeDataBase) and (r := item.esp_range) is not None: + return r.start_mark.document + return None + + +def _fmt_mark(loc: Any) -> str: + """Render a DocumentLocation as a 1-based 'file line:col' string.""" + return f"{loc.document} {loc.line + 1}:{loc.column + 1}" + + +def _obj_loc(obj: Any) -> str: + """Return formatted source location for *obj*, or '' if it has none.""" + if isinstance(obj, ESPHomeDataBase) and (r := obj.esp_range) is not None: + return _fmt_mark(r.start_mark) + return "" + + +def _fmt_segment(seg: list) -> str: + """Format a path segment, rendering integers as [n] subscripts.""" + parts: list[str] = [] + for item in seg: + if isinstance(item, int): + if parts: + parts[-1] = f"{parts[-1]}[{item}]" + else: + parts.append(f"[{item}]") + else: + parts.append(str(item)) + return "->".join(parts) + + +def _split_into_frames( + path: DocumentPath, +) -> list[tuple[list, str]]: + """Group *path* into per-file frames at include boundaries. + + A "frame" is the slice of the path that belongs to one source document. + Each path item is either: + + * a **located key** — has an ``ESPHomeDataBase`` source mark; this is + what tells us which document owns the surrounding keys. + * an **integer** — a list subscript; always attaches to the open frame + (renders as ``foo[3]`` on the previous name). + * an **unlocated string** — a key with no source mark (e.g. constants + like ``CONF_PACKAGES``); it describes the parent of the *next* file, + so it migrates to the next frame when the document changes. + + Returns a list of ``(items, "file line:col")`` tuples in walk order + (outermost frame first). + """ + frames: list[tuple[list, str]] = [] + open_frame: list = [] + next_frame_keys: list = [] # unlocated strings buffered for the next frame + open_doc: str | None = None + open_loc = "" + + for item in path: + doc = _path_doc(item) + if doc is None: + # Ints subscript the open frame's last name; everything else + # (strings, or leading ints with no open frame) is buffered for + # the next frame. + if isinstance(item, int) and open_doc is not None: + open_frame.append(item) + else: + next_frame_keys.append(item) + continue + if open_doc is not None and doc != open_doc: + # Crossed an include boundary: close the open frame. + frames.append((open_frame, open_loc)) + open_frame = [] + open_frame.extend(next_frame_keys) + next_frame_keys.clear() + open_frame.append(item) + open_doc = doc + open_loc = _fmt_mark(item.esp_range.start_mark) + + if open_doc is not None: + # Trailing buffered keys belong to the innermost (last) frame. + open_frame.extend(next_frame_keys) + frames.append((open_frame, open_loc)) + return frames + + +def format_path(path: DocumentPath, current_obj: Any) -> str: + """Build a human-readable include stack from a config path. + + Each YAML key in *path* that carries an ``ESPHomeDataBase`` ``esp_range`` + reveals which file it came from. When the source document changes between + consecutive such keys, that is an include boundary. The path is split + into per-file frames and formatted innermost-first, e.g.:: + + In: packages->roam in common/package/wifi.yaml 26:10 + Included from packages->net in common/hardware.yaml 44:2 + Included from packages->device in my_project.yaml 11:2 + + The innermost ``In:`` line uses the location from *current_obj* when + available (the value that triggered the error) for extra precision. + """ + frames = _split_into_frames(path) + obj_loc = _obj_loc(current_obj) + + if not frames: + # No source info anywhere in the path: render as a flat path, + # using current_obj's location if it happens to have one. + suffix = f" in {obj_loc}" if obj_loc else "" + return f"In: {_fmt_segment(path)}{suffix}" + + inner_seg, inner_loc = frames[-1] + lines = [f"In: {_fmt_segment(inner_seg)} in {obj_loc or inner_loc}"] + for seg, loc in reversed(frames[:-1]): + lines.append(f" Included from {_fmt_segment(seg)} in {loc}") + return "\n".join(lines) + + class ESPHomeDumper(yaml.SafeDumper): def represent_mapping(self, tag, mapping, flow_style=None): value = [] diff --git a/tests/component_tests/packages/test_packages.py b/tests/component_tests/packages/test_packages.py index 0bd339efa9..af4b6db796 100644 --- a/tests/component_tests/packages/test_packages.py +++ b/tests/component_tests/packages/test_packages.py @@ -46,7 +46,7 @@ from esphome.const import ( ) from esphome.core import CORE from esphome.util import OrderedDict -from esphome.yaml_util import IncludeFile, add_context, load_yaml +from esphome.yaml_util import DocumentPath, IncludeFile, add_context, load_yaml # Test strings TEST_DEVICE_NAME = "test_device_name" @@ -1113,7 +1113,7 @@ def test_packages_include_file_resolves_to_list(mock_resolve_include) -> None: """When packages: is an IncludeFile that resolves to a list, it is processed correctly.""" include_file = MagicMock(spec=IncludeFile) package_content = {CONF_WIFI: {CONF_SSID: TEST_PACKAGE_WIFI_SSID}} - mock_resolve_include.return_value = ([package_content], None) + mock_resolve_include.return_value = [package_content] config = {CONF_PACKAGES: include_file} result = do_packages_pass(config) @@ -1127,7 +1127,7 @@ def test_packages_include_file_resolves_to_dict(mock_resolve_include) -> None: """When packages: is an IncludeFile that resolves to a dict, it is processed correctly.""" include_file = MagicMock(spec=IncludeFile) package_content = {CONF_WIFI: {CONF_SSID: TEST_PACKAGE_WIFI_SSID}} - mock_resolve_include.return_value = ({"network": package_content}, None) + mock_resolve_include.return_value = {"network": package_content} config = {CONF_PACKAGES: include_file} result = do_packages_pass(config) @@ -1142,7 +1142,7 @@ def test_packages_include_file_resolves_to_invalid_type_raises( ) -> None: """When packages: is an IncludeFile that resolves to an invalid type, cv.Invalid is raised.""" include_file = MagicMock(spec=IncludeFile) - mock_resolve_include.return_value = ("not_a_dict_or_list", None) + mock_resolve_include.return_value = "not_a_dict_or_list" config = {CONF_PACKAGES: include_file} with pytest.raises( @@ -1215,7 +1215,9 @@ def test_named_dict_with_include_files_no_false_deprecation_warning( call_count = 0 - def failing_callback(package_config: dict, context: object) -> dict: + def failing_callback( + package_config: dict, context: object, path: DocumentPath | None = None + ) -> dict: nonlocal call_count call_count += 1 if call_count == 1: @@ -1251,7 +1253,9 @@ def test_validate_deprecated_false_raises_directly( call_count = 0 - def failing_callback(package_config: dict, context: object) -> dict: + def failing_callback( + package_config: dict, context: object, path: DocumentPath | None = None + ) -> dict: nonlocal call_count call_count += 1 if call_count == 1: @@ -1283,7 +1287,9 @@ def test_error_on_first_declared_package_still_detected() -> None: call_count = 0 - def fail_on_last(package_config: dict, context: object) -> dict: + def fail_on_last( + package_config: dict, context: object, path: DocumentPath | None = None + ) -> dict: nonlocal call_count call_count += 1 # Reverse iteration: third_pkg (1), second_pkg (2), first_pkg (3) @@ -1312,7 +1318,9 @@ def test_deprecated_single_package_fallback_still_works( attempt = 0 - def fail_then_succeed(package_config: dict, context: object) -> dict: + def fail_then_succeed( + package_config: dict, context: object, path: DocumentPath | None = None + ) -> dict: nonlocal attempt attempt += 1 if attempt == 1: diff --git a/tests/unit_tests/test_substitutions.py b/tests/unit_tests/test_substitutions.py index 3599e703d9..215ec291f9 100644 --- a/tests/unit_tests/test_substitutions.py +++ b/tests/unit_tests/test_substitutions.py @@ -659,7 +659,7 @@ def test_resolve_package_max_depth_exceeded(tmp_path: Path) -> None: cv.Invalid, match=f"Maximum include nesting depth \\({MAX_INCLUDE_DEPTH}\\) exceeded", ): - processor.resolve_package(package_config, substitutions.ContextVars()) + processor.resolve_package(package_config, substitutions.ContextVars(), []) def test_include_filename_substitution_undefined_var(tmp_path: Path) -> None: @@ -690,7 +690,7 @@ def test_raise_first_undefined_logs_extras_at_debug( caplog.at_level(logging.DEBUG, logger="esphome.components.substitutions"), pytest.raises(cv.Invalid) as exc_info, ): - substitutions.raise_first_undefined(errors, None, "package definition") + substitutions.raise_first_undefined(errors, "package definition") # First error is surfaced as the cv.Invalid message. raised = str(exc_info.value) @@ -706,7 +706,7 @@ def test_raise_first_undefined_logs_extras_at_debug( def test_raise_first_undefined_noop_on_empty() -> None: """An empty errors list is a no-op — no exception, no log.""" - substitutions.raise_first_undefined([], None, "package definition") + substitutions.raise_first_undefined([], "package definition") def test_do_substitution_pass_included_substitutions_must_be_mapping( @@ -778,4 +778,43 @@ def test_resolve_package_undefined_var_in_include_filename(tmp_path: Path) -> No ) processor = _PackageProcessor({}, None, False) with pytest.raises(cv.Invalid, match="unresolved substitutions"): - processor.resolve_package(package_config, substitutions.ContextVars()) + processor.resolve_package(package_config, substitutions.ContextVars(), []) + + +def test_resolve_include_error_shows_expanded_from_when_substituted( + tmp_path: Path, +) -> None: + """When a substituted filename fails to load, the error includes '(expanded from ...)'.""" + parent = tmp_path / "main.yaml" + parent.write_text("") + + def failing_loader(_path: Path) -> None: + raise EsphomeError("File not found") + + include = yaml_util.IncludeFile(parent, "${device}.yaml", None, failing_loader) + context = substitutions.ContextVars({"device": "my_device"}) + + with pytest.raises(cv.Invalid) as exc_info: + substitutions.resolve_include(include, [], context) + + msg = str(exc_info.value) + assert "my_device.yaml" in msg + assert "expanded from '${device}.yaml'" in msg + + +def test_resolve_include_error_no_expanded_from_for_literal_filename( + tmp_path: Path, +) -> None: + """When a literal filename fails to load, the error has no 'expanded from' clause.""" + parent = tmp_path / "main.yaml" + parent.write_text("") + + def failing_loader(_path: Path) -> None: + raise EsphomeError("File not found") + + include = yaml_util.IncludeFile(parent, "literal.yaml", None, failing_loader) + + with pytest.raises(cv.Invalid) as exc_info: + substitutions.resolve_include(include, [], substitutions.ContextVars()) + + assert "expanded from" not in str(exc_info.value) diff --git a/tests/unit_tests/test_yaml_util.py b/tests/unit_tests/test_yaml_util.py index bfd60de44d..e3aa2a16f5 100644 --- a/tests/unit_tests/test_yaml_util.py +++ b/tests/unit_tests/test_yaml_util.py @@ -9,8 +9,9 @@ from esphome import core, yaml_util from esphome.components import substitutions from esphome.config_helpers import Extend, Remove import esphome.config_validation as cv -from esphome.core import EsphomeError +from esphome.core import DocumentLocation, DocumentRange, EsphomeError from esphome.util import OrderedDict +from esphome.yaml_util import ESPHomeDataBase, format_path, make_data_base @pytest.fixture(autouse=True) @@ -712,3 +713,181 @@ def test_yaml_merge_chain_include_depth_exceeded() -> None: yaml_text = "base:\n <<: !include loop.yaml\n" with pytest.raises(EsphomeError, match="Maximum include chain depth"): yaml_util.parse_yaml(parent, io.StringIO(yaml_text), self_referencing_loader) + + +def _located(value, doc: str, line: int, col: int): + """Return *value* wrapped with a fake ESPHomeDataBase source location.""" + loc = DocumentLocation(doc, line, col) + obj = make_data_base(value) + if isinstance(obj, ESPHomeDataBase): + obj._esp_range = DocumentRange(loc, loc) + return obj + + +def test_format_path_no_location_info_returns_flat_path(): + """Plain path items with no esp_range produce a simple flat 'In:' line.""" + result = format_path(["wifi", "ssid"], None) + assert result == "In: wifi->ssid" + + +def test_format_path_no_location_info_current_obj_adds_file(): + """When path has no location but current_obj does, its location is shown.""" + obj = _located("${var}", "main.yaml", 5, 10) + result = format_path(["wifi", "ssid"], obj) + assert result == "In: wifi->ssid in main.yaml 6:11" + + +def test_format_path_single_frame_no_include_boundary(): + """All located keys from the same document → single 'In:' line, no 'Included from'.""" + path = ["packages", _located("pkg1", "root.yaml", 5, 2)] + result = format_path(path, None) + assert result.startswith("In: packages->pkg1 in root.yaml 6:3") + assert "Included from" not in result + + +def test_format_path_two_frames_shows_included_from(): + """Keys from two different documents produce 'In:' + one 'Included from' line.""" + path = [ + "packages", + _located("device", "root.yaml", 10, 2), + "packages", + _located("inner", "hardware.yaml", 3, 2), + ] + result = format_path(path, None) + assert "In: packages->inner in hardware.yaml 4:3" in result + assert "Included from packages->device in root.yaml 11:3" in result + + +def test_format_path_three_frames_full_include_stack(): + """Three document levels produce two 'Included from' lines in correct order.""" + path = [ + "packages", + _located("device", "root.yaml", 10, 2), + "packages", + _located("_wifi_", "hardware.yaml", 43, 2), + "packages", + _located("_roam_", "wifi.yaml", 25, 2), + ] + result = format_path(path, None) + lines = result.splitlines() + assert lines[0].startswith("In: packages->_roam_ in wifi.yaml") + assert lines[1].startswith(" Included from packages->_wifi_ in hardware.yaml") + assert lines[2].startswith(" Included from packages->device in root.yaml") + + +def test_format_path_current_obj_overrides_innermost_location(): + """current_obj's esp_range replaces the key's column for the 'In:' line.""" + path = ["packages", _located("pkg1", "root.yaml", 5, 2)] + # Value (the expression) sits at column 10, not column 2 like the key + value = _located("${undefined}", "root.yaml", 5, 10) + result = format_path(path, value) + assert "6:11" in result + assert "6:3" not in result + + +def test_format_path_empty_path_with_no_location(): + """Empty path with no location info returns 'In: '.""" + result = format_path([], None) + assert result == "In: " + + +def test_format_path_integer_path_items_formatted_as_subscript(): + """Integer indices are rendered as [n] subscripts in the flat fallback.""" + result = format_path(["packages", 0], None) + assert result == "In: packages[0]" + + +def test_format_path_integer_list_index_attached_to_previous_frame(): + """A list index between two include boundaries attaches to the outer frame.""" + path = [ + "packages", + _located("packages", "main.yaml", 5, 0), + 0, + _located("packages", "level1.yaml", 2, 0), + 0, + _located("esphome", "level2.yaml", 0, 0), + _located("name", "level2.yaml", 1, 8), + ] + result = format_path(path, None) + lines = result.splitlines() + assert lines[0].startswith("In: esphome->name in level2.yaml") + assert "packages[0]" in lines[1] and "level1.yaml" in lines[1] + assert "packages[0]" in lines[2] and "main.yaml" in lines[2] + + +def test_format_path_trailing_unlocated_string_after_located_key(): + """Plain string keys after the last located key must still appear in output.""" + path = [_located("packages", "main.yaml", 5, 0), "sub", "key"] + result = format_path(path, None) + assert result == "In: packages->sub->key in main.yaml 6:1" + + +def test_format_path_trailing_unlocated_int_attaches_to_current_frame(): + """Trailing ints attach to the open frame's last key (subscript), strings + buffer until end-of-path and then flush behind.""" + path = [_located("packages", "main.yaml", 5, 0), 0, "sub"] + result = format_path(path, None) + # Int attaches to 'packages' as [0] subscript; trailing 'sub' is flushed + # at end and appears after. + assert result == "In: packages[0]->sub in main.yaml 6:1" + + +def test_format_path_only_trailing_unlocated_strings_are_preserved(): + """Trailing pending items must not be silently dropped after the last frame.""" + path = [ + _located("packages", "main.yaml", 5, 0), + _located("inner", "hardware.yaml", 3, 0), + "tail1", + "tail2", + ] + result = format_path(path, None) + lines = result.splitlines() + assert lines[0] == "In: inner->tail1->tail2 in hardware.yaml 4:1" + assert lines[1] == " Included from packages in main.yaml 6:1" + + +def test_format_path_leading_int_with_no_current_doc_goes_to_pending(): + """An int before any located key is buffered and shown in the first frame.""" + path = [0, _located("name", "main.yaml", 1, 0)] + result = format_path(path, None) + # Leading ints have no preceding name to subscript onto, so they render + # as bare [n] in the formatted segment. + assert result == "In: [0]->name in main.yaml 2:1" + + +def test_format_path_only_unlocated_int_returns_flat_fallback(): + """Path with only an int and no location info renders via the flat fallback.""" + result = format_path([0], None) + assert result == "In: [0]" + + +def test_format_path_current_obj_in_different_doc_than_innermost_frame(): + """current_obj's location is preferred even when its document differs from the frame's.""" + path = [_located("packages", "root.yaml", 1, 0)] + value = _located("${var}", "other.yaml", 9, 4) + result = format_path(path, value) + # Innermost line uses current_obj's mark (other.yaml 10:5), not the key's. + assert result == "In: packages in other.yaml 10:5" + + +def test_format_path_current_obj_without_location_falls_back_to_key(): + """An ESPHomeDataBase current_obj with no esp_range falls back to the key's location.""" + + class _NoRange(ESPHomeDataBase, str): + pass + + obj = _NoRange.__new__(_NoRange, "value") + str.__init__(obj) + # No _esp_range set on this instance. + assert obj.esp_range is None + + path = [_located("packages", "main.yaml", 5, 2)] + result = format_path(path, obj) + assert result == "In: packages in main.yaml 6:3" + + +def test_format_path_empty_path_with_located_current_obj(): + """An empty path with a located current_obj still surfaces the location.""" + obj = _located("${var}", "main.yaml", 0, 0) + result = format_path([], obj) + assert result == "In: in main.yaml 1:1"