diff --git a/esphome/__main__.py b/esphome/__main__.py index 87abd7f796..a696cceffb 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -1242,6 +1242,38 @@ def command_clean(args: ArgsProtocol, config: ConfigType) -> int | None: return 0 +def command_bundle(args: ArgsProtocol, config: ConfigType) -> int | None: + from esphome.bundle import BUNDLE_EXTENSION, ConfigBundleCreator + + creator = ConfigBundleCreator(config) + + if args.list_only: + files = creator.discover_files() + for bf in sorted(files, key=lambda f: f.path): + safe_print(f" {bf.path}") + _LOGGER.info("Found %d files", len(files)) + return 0 + + result = creator.create_bundle() + + if args.output: + output_path = Path(args.output) + else: + stem = CORE.config_path.stem + output_path = CORE.config_dir / f"{stem}{BUNDLE_EXTENSION}" + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(result.data) + + _LOGGER.info( + "Bundle created: %s (%d files, %.1f KB)", + output_path, + len(result.files), + len(result.data) / 1024, + ) + return 0 + + def command_dashboard(args: ArgsProtocol) -> int | None: from esphome.dashboard import dashboard @@ -1517,6 +1549,7 @@ POST_CONFIG_ACTIONS = { "rename": command_rename, "discover": command_discover, "analyze-memory": command_analyze_memory, + "bundle": command_bundle, } SIMPLE_CONFIG_ACTIONS = [ @@ -1818,6 +1851,24 @@ def parse_args(argv): "configuration", help="Your YAML configuration file(s).", nargs="+" ) + parser_bundle = subparsers.add_parser( + "bundle", + help="Create a self-contained config bundle for remote compilation.", + ) + parser_bundle.add_argument( + "configuration", help="Your YAML configuration file(s).", nargs="+" + ) + parser_bundle.add_argument( + "-o", + "--output", + help="Output path for the bundle archive.", + ) + parser_bundle.add_argument( + "--list-only", + help="List discovered files without creating the archive.", + action="store_true", + ) + # Keep backward compatibility with the old command line format of # esphome . # @@ -1896,6 +1947,16 @@ def run_esphome(argv): _LOGGER.warning("Skipping secrets file %s", conf_path) return 0 + # Bundle support: if the configuration is a .esphomebundle, extract it + # and rewrite conf_path to the extracted YAML config. + from esphome.bundle import is_bundle_path, prepare_bundle_for_compile + + if is_bundle_path(conf_path): + _LOGGER.info("Extracting config bundle %s...", conf_path) + conf_path = prepare_bundle_for_compile(conf_path) + # Update the argument so downstream code sees the extracted path + args.configuration[0] = str(conf_path) + CORE.config_path = conf_path CORE.dashboard = args.dashboard diff --git a/esphome/bundle.py b/esphome/bundle.py new file mode 100644 index 0000000000..b6816c7c95 --- /dev/null +++ b/esphome/bundle.py @@ -0,0 +1,699 @@ +"""Config bundle creator and extractor for ESPHome. + +A bundle is a self-contained .tar.gz archive containing a YAML config +and every local file it depends on. Bundles can be created from a config +and compiled directly: ``esphome compile my_device.esphomebundle.tar.gz`` +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import StrEnum +import io +import json +import logging +from pathlib import Path +import re +import shutil +import tarfile +from typing import Any + +from esphome import const, yaml_util +from esphome.const import ( + CONF_ESPHOME, + CONF_EXTERNAL_COMPONENTS, + CONF_INCLUDES, + CONF_INCLUDES_C, + CONF_PATH, + CONF_SOURCE, + CONF_TYPE, +) +from esphome.core import CORE, EsphomeError + +_LOGGER = logging.getLogger(__name__) + +BUNDLE_EXTENSION = ".esphomebundle.tar.gz" +MANIFEST_FILENAME = "manifest.json" +CURRENT_MANIFEST_VERSION = 1 +MAX_DECOMPRESSED_SIZE = 500 * 1024 * 1024 # 500 MB +MAX_MANIFEST_SIZE = 1024 * 1024 # 1 MB + +# Directories preserved across bundle extractions (build caches) +_PRESERVE_DIRS = (".esphome", ".pioenvs", ".pio") +_BUNDLE_STAGING_DIR = ".bundle_staging" + + +class ManifestKey(StrEnum): + """Keys used in bundle manifest.json.""" + + MANIFEST_VERSION = "manifest_version" + ESPHOME_VERSION = "esphome_version" + CONFIG_FILENAME = "config_filename" + FILES = "files" + HAS_SECRETS = "has_secrets" + + +# String prefixes that are never local file paths +_NON_PATH_PREFIXES = ("http://", "https://", "ftp://", "mdi:", "<") + +# File extensions recognized when resolving relative path strings. +# A relative string with one of these extensions is resolved against the +# config directory and included if the file exists. +_KNOWN_FILE_EXTENSIONS = frozenset( + { + # Fonts + ".ttf", + ".otf", + ".woff", + ".woff2", + ".pcf", + ".bdf", + # Images + ".png", + ".jpg", + ".jpeg", + ".bmp", + ".gif", + ".svg", + ".ico", + ".webp", + # Certificates + ".pem", + ".crt", + ".key", + ".der", + ".p12", + ".pfx", + # C/C++ includes + ".h", + ".hpp", + ".c", + ".cpp", + ".ino", + # Web assets + ".css", + ".js", + ".html", + } +) + + +# Matches !secret references in YAML text. This is intentionally a simple +# regex scan rather than a YAML parse — it may match inside comments or +# multi-line strings, which is the conservative direction (include more +# secrets rather than fewer). +_SECRET_RE = re.compile(r"!secret\s+(\S+)") + + +def _find_used_secret_keys(yaml_files: list[Path]) -> set[str]: + """Scan YAML files for ``!secret `` references.""" + keys: set[str] = set() + for fpath in yaml_files: + try: + text = fpath.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + continue + for match in _SECRET_RE.finditer(text): + keys.add(match.group(1)) + return keys + + +@dataclass +class BundleFile: + """A file to include in the bundle.""" + + path: str # Relative path inside the archive + source: Path # Absolute path on disk + + +@dataclass +class BundleResult: + """Result of creating a bundle.""" + + data: bytes + manifest: dict[str, Any] + files: list[BundleFile] + + +@dataclass +class BundleManifest: + """Parsed and validated bundle manifest.""" + + manifest_version: int + esphome_version: str + config_filename: str + files: list[str] + has_secrets: bool + + +class ConfigBundleCreator: + """Creates a self-contained bundle from an ESPHome config.""" + + def __init__(self, config: dict[str, Any]) -> None: + self._config = config + self._config_dir = CORE.config_dir + self._config_path = CORE.config_path + self._files: list[BundleFile] = [] + self._seen_paths: set[Path] = set() + self._secrets_paths: set[Path] = set() + + def discover_files(self) -> list[BundleFile]: + """Discover all files needed for the bundle.""" + self._files = [] + self._seen_paths = set() + self._secrets_paths = set() + + # The main config file + self._add_file(self._config_path) + + # Phase 1: YAML includes (tracked during config loading) + self._discover_yaml_includes() + + # Phase 2: Component-referenced files from validated config + self._discover_component_files() + + return list(self._files) + + def create_bundle(self) -> BundleResult: + """Create the bundle archive.""" + files = self.discover_files() + + # Determine which secret keys are actually referenced by the + # bundled YAML files so we only ship those, not the entire + # secrets.yaml which may contain secrets for other devices. + yaml_sources = [ + bf.source for bf in files if bf.source.suffix in (".yaml", ".yml") + ] + used_secret_keys = _find_used_secret_keys(yaml_sources) + filtered_secrets = self._build_filtered_secrets(used_secret_keys) + + has_secrets = bool(filtered_secrets) + if has_secrets: + _LOGGER.warning( + "Bundle contains secrets (e.g. Wi-Fi passwords). " + "Do not share it with untrusted parties." + ) + + manifest = self._build_manifest(files, has_secrets=has_secrets) + + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + # Add manifest first + manifest_data = json.dumps(manifest, indent=2).encode("utf-8") + _add_bytes_to_tar(tar, MANIFEST_FILENAME, manifest_data) + + # Add filtered secrets files + for rel_path, data in sorted(filtered_secrets.items()): + _add_bytes_to_tar(tar, rel_path, data) + + # Add files in sorted order for determinism, skipping secrets + # files which were already added above with filtered content + for bf in sorted(files, key=lambda f: f.path): + if bf.source in self._secrets_paths: + continue + self._add_to_tar(tar, bf) + + return BundleResult(data=buf.getvalue(), manifest=manifest, files=files) + + def _add_file(self, abs_path: Path) -> bool: + """Add a file to the bundle. Returns False if already added.""" + abs_path = abs_path.resolve() + if abs_path in self._seen_paths: + return False + if not abs_path.is_file(): + _LOGGER.warning("Bundle: skipping missing file %s", abs_path) + return False + + rel_path = self._relative_to_config_dir(abs_path) + if rel_path is None: + _LOGGER.warning( + "Bundle: skipping file outside config directory: %s", abs_path + ) + return False + + self._seen_paths.add(abs_path) + self._files.append(BundleFile(path=rel_path, source=abs_path)) + return True + + def _add_directory(self, abs_path: Path) -> None: + """Recursively add all files in a directory.""" + abs_path = abs_path.resolve() + if not abs_path.is_dir(): + _LOGGER.warning("Bundle: skipping missing directory %s", abs_path) + return + for child in sorted(abs_path.rglob("*")): + if child.is_file() and "__pycache__" not in child.parts: + self._add_file(child) + + def _relative_to_config_dir(self, abs_path: Path) -> str | None: + """Get a path relative to the config directory. Returns None if outside. + + Always uses forward slashes for consistency in tar archives. + """ + try: + return abs_path.relative_to(self._config_dir).as_posix() + except ValueError: + return None + + def _discover_yaml_includes(self) -> None: + """Discover YAML files loaded during config parsing. + + We track files by wrapping _load_yaml_internal. The config has already + been loaded at this point (bundle is a POST_CONFIG_ACTION), so we + re-load just to discover the file list. + + Secrets files are tracked separately so we can filter them to + only include the keys this config actually references. + """ + with yaml_util.track_yaml_loads() as loaded_files: + try: + yaml_util.load_yaml(self._config_path) + except EsphomeError: + _LOGGER.debug( + "Bundle: re-loading YAML for include discovery failed, " + "proceeding with partial file list" + ) + + for fpath in loaded_files: + if fpath == self._config_path.resolve(): + continue # Already added as config + if fpath.name in const.SECRETS_FILES: + self._secrets_paths.add(fpath) + self._add_file(fpath) + + def _discover_component_files(self) -> None: + """Walk the validated config for file references. + + Uses a generic recursive walk to find file paths instead of + hardcoding per-component knowledge about config dict formats. + After validation, components typically resolve paths to absolute + using CORE.relative_config_path() or cv.file_(). Relative paths + with known file extensions are also resolved and checked. + + Core ESPHome concepts that use relative paths or directories + are handled explicitly. + """ + config = self._config + + # Generic walk: find all file paths in the validated config + self._walk_config_for_files(config) + + # --- Core ESPHome concepts needing explicit handling --- + + # esphome.includes / includes_c - can be relative paths and directories + esphome_conf = config.get(CONF_ESPHOME, {}) + for include_path in esphome_conf.get(CONF_INCLUDES, []): + resolved = _resolve_include_path(include_path) + if resolved is None: + continue + if resolved.is_dir(): + self._add_directory(resolved) + else: + self._add_file(resolved) + for include_path in esphome_conf.get(CONF_INCLUDES_C, []): + resolved = _resolve_include_path(include_path) + if resolved is not None: + self._add_file(resolved) + + # external_components with source: local - directories + for ext_conf in config.get(CONF_EXTERNAL_COMPONENTS, []): + source = ext_conf.get(CONF_SOURCE, {}) + if not isinstance(source, dict): + continue + if source.get(CONF_TYPE) != "local": + continue + path = source.get(CONF_PATH) + if not path: + continue + p = Path(path) + if not p.is_absolute(): + p = CORE.relative_config_path(p) + self._add_directory(p) + + def _walk_config_for_files(self, obj: Any) -> None: + """Recursively walk the config dict looking for file path references.""" + if isinstance(obj, dict): + for value in obj.values(): + self._walk_config_for_files(value) + elif isinstance(obj, (list, tuple)): + for item in obj: + self._walk_config_for_files(item) + elif isinstance(obj, Path): + if obj.is_absolute() and obj.is_file(): + self._add_file(obj) + elif isinstance(obj, str): + self._check_string_path(obj) + + def _check_string_path(self, value: str) -> None: + """Check if a string value is a local file reference.""" + # Fast exits for strings that cannot be file paths + if len(value) < 2 or "\n" in value: + return + if value.startswith(_NON_PATH_PREFIXES): + return + # File paths must contain a path separator or a dot (for extension) + if "/" not in value and "\\" not in value and "." not in value: + return + + p = Path(value) + + # Absolute path - check if it points to an existing file + if p.is_absolute(): + if p.is_file(): + self._add_file(p) + return + + # Relative path with a known file extension - likely a component + # validator that forgot to resolve to absolute via cv.file_() or + # CORE.relative_config_path(). Warn and try to resolve. + if p.suffix.lower() in _KNOWN_FILE_EXTENSIONS: + _LOGGER.warning( + "Bundle: non-absolute path in validated config: %s " + "(component validator should return absolute paths)", + value, + ) + resolved = CORE.relative_config_path(p) + if resolved.is_file(): + self._add_file(resolved) + + def _build_filtered_secrets(self, used_keys: set[str]) -> dict[str, bytes]: + """Build filtered secrets files containing only the referenced keys. + + Returns a dict mapping relative archive path to YAML bytes. + """ + if not used_keys or not self._secrets_paths: + return {} + + result: dict[str, bytes] = {} + for secrets_path in self._secrets_paths: + rel_path = self._relative_to_config_dir(secrets_path) + if rel_path is None: + continue + try: + all_secrets = yaml_util.load_yaml(secrets_path, clear_secrets=False) + except EsphomeError: + _LOGGER.warning("Bundle: failed to load secrets file %s", secrets_path) + continue + if not isinstance(all_secrets, dict): + continue + filtered = {k: v for k, v in all_secrets.items() if k in used_keys} + if filtered: + data = yaml_util.dump(filtered, show_secrets=True).encode("utf-8") + result[rel_path] = data + return result + + def _build_manifest( + self, files: list[BundleFile], *, has_secrets: bool + ) -> dict[str, Any]: + """Build the manifest.json content.""" + return { + ManifestKey.MANIFEST_VERSION: CURRENT_MANIFEST_VERSION, + ManifestKey.ESPHOME_VERSION: const.__version__, + ManifestKey.CONFIG_FILENAME: self._config_path.name, + ManifestKey.FILES: [f.path for f in files], + ManifestKey.HAS_SECRETS: has_secrets, + } + + @staticmethod + def _add_to_tar(tar: tarfile.TarFile, bf: BundleFile) -> None: + """Add a BundleFile to the tar archive with deterministic metadata.""" + with open(bf.source, "rb") as f: + _add_bytes_to_tar(tar, bf.path, f.read()) + + +def extract_bundle( + bundle_path: Path, + target_dir: Path | None = None, +) -> Path: + """Extract a bundle archive and return the path to the config YAML. + + Sanity checks reject path traversal, symlinks, absolute paths, and + oversized archives to prevent accidental file overwrites or extraction + outside the target directory. These are **not** a security boundary — + bundles are assumed to come from the user's own machine or a trusted + build pipeline. + + Args: + bundle_path: Path to the .tar.gz bundle file. + target_dir: Directory to extract into. If None, extracts next to + the bundle file in a directory named after it. + + Returns: + Absolute path to the extracted config YAML file. + + Raises: + EsphomeError: If the bundle is invalid or extraction fails. + """ + + bundle_path = bundle_path.resolve() + if not bundle_path.is_file(): + raise EsphomeError(f"Bundle file not found: {bundle_path}") + + if target_dir is None: + target_dir = _default_target_dir(bundle_path) + + target_dir = target_dir.resolve() + target_dir.mkdir(parents=True, exist_ok=True) + + # Read and validate the archive + try: + with tarfile.open(bundle_path, "r:gz") as tar: + manifest = _read_manifest_from_tar(tar) + _validate_tar_members(tar, target_dir) + tar.extractall(path=target_dir, filter="data") + except tarfile.TarError as err: + raise EsphomeError(f"Failed to extract bundle: {err}") from err + + config_filename = manifest[ManifestKey.CONFIG_FILENAME] + config_path = target_dir / config_filename + if not config_path.is_file(): + raise EsphomeError( + f"Bundle manifest references config '{config_filename}' " + f"but it was not found in the archive" + ) + + return config_path + + +def read_bundle_manifest(bundle_path: Path) -> BundleManifest: + """Read and validate the manifest from a bundle without full extraction. + + Args: + bundle_path: Path to the .tar.gz bundle file. + + Returns: + Parsed BundleManifest. + + Raises: + EsphomeError: If the manifest is missing, invalid, or version unsupported. + """ + + try: + with tarfile.open(bundle_path, "r:gz") as tar: + manifest = _read_manifest_from_tar(tar) + except tarfile.TarError as err: + raise EsphomeError(f"Failed to read bundle: {err}") from err + + return BundleManifest( + manifest_version=manifest[ManifestKey.MANIFEST_VERSION], + esphome_version=manifest.get(ManifestKey.ESPHOME_VERSION, "unknown"), + config_filename=manifest[ManifestKey.CONFIG_FILENAME], + files=manifest.get(ManifestKey.FILES, []), + has_secrets=manifest.get(ManifestKey.HAS_SECRETS, False), + ) + + +def _read_manifest_from_tar(tar: tarfile.TarFile) -> dict[str, Any]: + """Read and validate manifest.json from an open tar archive.""" + + try: + member = tar.getmember(MANIFEST_FILENAME) + except KeyError: + raise EsphomeError("Invalid bundle: missing manifest.json") from None + + f = tar.extractfile(member) + if f is None: + raise EsphomeError("Invalid bundle: manifest.json is not a regular file") + + if member.size > MAX_MANIFEST_SIZE: + raise EsphomeError( + f"Invalid bundle: manifest.json too large " + f"({member.size} bytes, max {MAX_MANIFEST_SIZE})" + ) + + try: + manifest = json.loads(f.read()) + except (json.JSONDecodeError, UnicodeDecodeError) as err: + raise EsphomeError(f"Invalid bundle: malformed manifest.json: {err}") from err + + # Version check + version = manifest.get(ManifestKey.MANIFEST_VERSION) + if version is None: + raise EsphomeError("Invalid bundle: manifest.json missing 'manifest_version'") + if not isinstance(version, int) or version < 1: + raise EsphomeError( + f"Invalid bundle: manifest_version must be a positive integer, got {version!r}" + ) + if version > CURRENT_MANIFEST_VERSION: + raise EsphomeError( + f"Bundle manifest version {version} is newer than this ESPHome " + f"version supports (max {CURRENT_MANIFEST_VERSION}). " + f"Please upgrade ESPHome to compile this bundle." + ) + + # Required fields + if ManifestKey.CONFIG_FILENAME not in manifest: + raise EsphomeError("Invalid bundle: manifest.json missing 'config_filename'") + + return manifest + + +def _validate_tar_members(tar: tarfile.TarFile, target_dir: Path) -> None: + """Sanity-check tar members to prevent mistakes and accidental overwrites. + + This is not a security boundary — bundles are created locally or come + from a trusted build pipeline. The checks catch malformed archives + and common mistakes (stray absolute paths, ``..`` components) that + could silently overwrite unrelated files. + """ + + total_size = 0 + for member in tar.getmembers(): + # Reject absolute paths (Unix and Windows) + if member.name.startswith(("/", "\\")): + raise EsphomeError( + f"Invalid bundle: absolute path in archive: {member.name}" + ) + + # Reject path traversal (split on both / and \ for cross-platform) + parts = re.split(r"[/\\]", member.name) + if ".." in parts: + raise EsphomeError( + f"Invalid bundle: path traversal in archive: {member.name}" + ) + + # Reject symlinks + if member.issym() or member.islnk(): + raise EsphomeError(f"Invalid bundle: symlink in archive: {member.name}") + + # Ensure extraction stays within target_dir + target_path = (target_dir / member.name).resolve() + if not target_path.is_relative_to(target_dir): + raise EsphomeError( + f"Invalid bundle: file would extract outside target: {member.name}" + ) + + # Track total decompressed size + total_size += member.size + if total_size > MAX_DECOMPRESSED_SIZE: + raise EsphomeError( + f"Invalid bundle: decompressed size exceeds " + f"{MAX_DECOMPRESSED_SIZE // (1024 * 1024)}MB limit" + ) + + +def is_bundle_path(path: Path) -> bool: + """Check if a path looks like a bundle file.""" + return path.name.lower().endswith(BUNDLE_EXTENSION) + + +def _add_bytes_to_tar(tar: tarfile.TarFile, name: str, data: bytes) -> None: + """Add in-memory bytes to a tar archive with deterministic metadata.""" + info = tarfile.TarInfo(name=name) + info.size = len(data) + info.mtime = 0 + info.uid = 0 + info.gid = 0 + info.mode = 0o644 + tar.addfile(info, io.BytesIO(data)) + + +def _resolve_include_path(include_path: Any) -> Path | None: + """Resolve an include path to absolute, skipping system includes.""" + if isinstance(include_path, str) and include_path.startswith("<"): + return None # System include, not a local file + p = Path(include_path) + if not p.is_absolute(): + p = CORE.relative_config_path(p) + return p + + +def _default_target_dir(bundle_path: Path) -> Path: + """Compute the default extraction directory for a bundle.""" + name = bundle_path.name + if name.lower().endswith(BUNDLE_EXTENSION): + name = name[: -len(BUNDLE_EXTENSION)] + return bundle_path.parent / name + + +def _restore_preserved_dirs(preserved: dict[str, Path], target_dir: Path) -> None: + """Move preserved build cache directories back into target_dir. + + If the bundle contained entries under a preserved directory name, + the extracted copy is removed so the original cache always wins. + """ + for dirname, src in preserved.items(): + dst = target_dir / dirname + if dst.exists(): + shutil.rmtree(dst) + shutil.move(str(src), str(dst)) + + +def prepare_bundle_for_compile( + bundle_path: Path, + target_dir: Path | None = None, +) -> Path: + """Extract a bundle for compilation, preserving build caches. + + Unlike extract_bundle(), this preserves .esphome/ and .pioenvs/ + directories in the target if they already exist (for incremental builds). + + Args: + bundle_path: Path to the .tar.gz bundle file. + target_dir: Directory to extract into. Must be specified for + build server use. + + Returns: + Absolute path to the extracted config YAML file. + """ + + bundle_path = bundle_path.resolve() + if not bundle_path.is_file(): + raise EsphomeError(f"Bundle file not found: {bundle_path}") + + if target_dir is None: + target_dir = _default_target_dir(bundle_path) + + target_dir = target_dir.resolve() + target_dir.mkdir(parents=True, exist_ok=True) + + preserved: dict[str, Path] = {} + + # Temporarily move preserved dirs out of the way + staging = target_dir / _BUNDLE_STAGING_DIR + for dirname in _PRESERVE_DIRS: + src = target_dir / dirname + if src.is_dir(): + dst = staging / dirname + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(src), str(dst)) + preserved[dirname] = dst + + try: + # Clean non-preserved content and extract fresh + for item in target_dir.iterdir(): + if item.name == _BUNDLE_STAGING_DIR: + continue + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() + + config_path = extract_bundle(bundle_path, target_dir) + finally: + # Restore preserved dirs (idempotent) and clean staging + _restore_preserved_dirs(preserved, target_dir) + if staging.is_dir(): + shutil.rmtree(staging) + + return config_path diff --git a/esphome/components/wifi/wpa2_eap.py b/esphome/components/wifi/wpa2_eap.py index 5d5bd8dca3..9da3494329 100644 --- a/esphome/components/wifi/wpa2_eap.py +++ b/esphome/components/wifi/wpa2_eap.py @@ -71,9 +71,11 @@ def _validate_load_certificate(value): def validate_certificate(value): + # _validate_load_certificate already calls cv.file_() internally, + # but returns the parsed certificate object. We re-call cv.file_() + # to get the resolved path string that the bundle walker can discover. _validate_load_certificate(value) - # Validation result should be the path, not the loaded certificate - return value + return str(cv.file_(value)) def _validate_load_private_key(key, cert_pw): diff --git a/esphome/yaml_util.py b/esphome/yaml_util.py index e001316a22..a24c1ebccb 100644 --- a/esphome/yaml_util.py +++ b/esphome/yaml_util.py @@ -1,7 +1,7 @@ from __future__ import annotations -from collections.abc import Callable -from contextlib import suppress +from collections.abc import Callable, Generator +from contextlib import contextmanager, suppress import functools import inspect from io import BytesIO, TextIOBase, TextIOWrapper @@ -44,6 +44,27 @@ _LOGGER = logging.getLogger(__name__) SECRET_YAML = "secrets.yaml" _SECRET_CACHE = {} _SECRET_VALUES = {} +# Not thread-safe — config processing is single-threaded today. +_load_listeners: list[Callable[[Path], None]] = [] + + +@contextmanager +def track_yaml_loads() -> Generator[list[Path]]: + """Context manager that records every file loaded by the YAML loader. + + Yields a list that is populated with resolved Path objects for every + file loaded through ``_load_yaml_internal`` while the context is active. + """ + loaded: list[Path] = [] + + def _on_load(fname: Path) -> None: + loaded.append(Path(fname).resolve()) + + _load_listeners.append(_on_load) + try: + yield loaded + finally: + _load_listeners.remove(_on_load) class ESPHomeDataBase: @@ -466,6 +487,8 @@ def load_yaml(fname: Path, clear_secrets: bool = True) -> Any: def _load_yaml_internal(fname: Path) -> Any: """Load a YAML file.""" + for listener in _load_listeners: + listener(fname) try: with fname.open(encoding="utf-8") as f_handle: return parse_yaml(fname, f_handle) @@ -473,10 +496,10 @@ def _load_yaml_internal(fname: Path) -> Any: raise EsphomeError(f"Error reading file {fname}: {err}") from err -def parse_yaml( - file_name: Path, file_handle: TextIOWrapper, yaml_loader=_load_yaml_internal -) -> Any: +def parse_yaml(file_name: Path, file_handle: TextIOWrapper, yaml_loader=None) -> Any: """Parse a YAML file.""" + if yaml_loader is None: + yaml_loader = _load_yaml_internal try: return _load_yaml_internal_with_type( ESPHomeLoader, file_name, file_handle, yaml_loader diff --git a/tests/unit_tests/fixtures/bundle/assets/certs/ca_cert.pem b/tests/unit_tests/fixtures/bundle/assets/certs/ca_cert.pem new file mode 100644 index 0000000000..6d200b15ef --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/assets/certs/ca_cert.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIICzjCCAbagAwIBAgIUW3BzjtekVgMj12/oeXawSswGyXMwDQYJKoZIhvcNAQEL +BQAwITEfMB0GA1UEAwwWRVNQSG9tZSBCdW5kbGUgVGVzdCBDQTAeFw0yNjAyMDYx +MzMxMTZaFw0yNzAyMDYxMzMxMTZaMCExHzAdBgNVBAMMFkVTUEhvbWUgQnVuZGxl +IFRlc3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG62vBFkGn +hEu54gh2A7b1ZwesVadZ6u0iaVO7GSWiI0o4nb6xv7ULZbGrgsKNIO6qCV4VSR3p +BfMhF5dFy8kkMzA8dKZMk16tygzocdNum2QQ8BHyIsATL7SGZ33si9Alp30gXv6h +XSlEKYDKHFavkDhWPFNa5+oeHbMS/MxjpOUXIpq32VaFpJr427d9Y9wGjuK8B7Gp +CI5Ub1g2dpC9xSHqQKD3JZokmtc70+mD74AcNWbyxWp0bkW9wOfNJJnAoiwhJxQ8 +yfE37UsUIVc8014NhdhU1K/S0iQuOKfGX1L/GAshv8syQIcDfzJuJdX+5E/leAYD +UEKqRkcLT+D5AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAF1HpJ6d+W5WrzOQrGej +41pxCDeJ9tSiSj/KtvJfjEVIpg0hMRTY7nSL7OAg9KGESfx4u1jMwVnyOv34br5B +DTlRl+wF2k7Ip8CNnyZfCC+1SVQZpUt1mVNz8BhIZZ9/a830wCILNQQrVKkSeNBk +SEc1qTt4mIhQZ+M422qAswluv4fz/FW1f4oB9KhCpzUCANjmyERnqTnImjnJu8h0 +jbPNnNsN+G+Roju8UD/7atWYfAUmDjHx72Ci/5G9SzoM5fhgxxu43XYd5RW5wBzt +j4KdKdYlDtOL62mRPKWd40uGnJcieUjisU7noRn0ErMgbUlhLdbXT9X7aNborZcu +x6I= +-----END CERTIFICATE----- diff --git a/tests/unit_tests/fixtures/bundle/assets/certs/client_cert.pem b/tests/unit_tests/fixtures/bundle/assets/certs/client_cert.pem new file mode 100644 index 0000000000..6d200b15ef --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/assets/certs/client_cert.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIICzjCCAbagAwIBAgIUW3BzjtekVgMj12/oeXawSswGyXMwDQYJKoZIhvcNAQEL +BQAwITEfMB0GA1UEAwwWRVNQSG9tZSBCdW5kbGUgVGVzdCBDQTAeFw0yNjAyMDYx +MzMxMTZaFw0yNzAyMDYxMzMxMTZaMCExHzAdBgNVBAMMFkVTUEhvbWUgQnVuZGxl +IFRlc3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG62vBFkGn +hEu54gh2A7b1ZwesVadZ6u0iaVO7GSWiI0o4nb6xv7ULZbGrgsKNIO6qCV4VSR3p +BfMhF5dFy8kkMzA8dKZMk16tygzocdNum2QQ8BHyIsATL7SGZ33si9Alp30gXv6h +XSlEKYDKHFavkDhWPFNa5+oeHbMS/MxjpOUXIpq32VaFpJr427d9Y9wGjuK8B7Gp +CI5Ub1g2dpC9xSHqQKD3JZokmtc70+mD74AcNWbyxWp0bkW9wOfNJJnAoiwhJxQ8 +yfE37UsUIVc8014NhdhU1K/S0iQuOKfGX1L/GAshv8syQIcDfzJuJdX+5E/leAYD +UEKqRkcLT+D5AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAF1HpJ6d+W5WrzOQrGej +41pxCDeJ9tSiSj/KtvJfjEVIpg0hMRTY7nSL7OAg9KGESfx4u1jMwVnyOv34br5B +DTlRl+wF2k7Ip8CNnyZfCC+1SVQZpUt1mVNz8BhIZZ9/a830wCILNQQrVKkSeNBk +SEc1qTt4mIhQZ+M422qAswluv4fz/FW1f4oB9KhCpzUCANjmyERnqTnImjnJu8h0 +jbPNnNsN+G+Roju8UD/7atWYfAUmDjHx72Ci/5G9SzoM5fhgxxu43XYd5RW5wBzt +j4KdKdYlDtOL62mRPKWd40uGnJcieUjisU7noRn0ErMgbUlhLdbXT9X7aNborZcu +x6I= +-----END CERTIFICATE----- diff --git a/tests/unit_tests/fixtures/bundle/assets/certs/client_key.pem b/tests/unit_tests/fixtures/bundle/assets/certs/client_key.pem new file mode 100644 index 0000000000..6182f45d8b --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/assets/certs/client_key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAxutrwRZBp4RLueIIdgO29WcHrFWnWertImlTuxkloiNKOJ2+ +sb+1C2Wxq4LCjSDuqgleFUkd6QXzIReXRcvJJDMwPHSmTJNercoM6HHTbptkEPAR +8iLAEy+0hmd97IvQJad9IF7+oV0pRCmAyhxWr5A4VjxTWufqHh2zEvzMY6TlFyKa +t9lWhaSa+Nu3fWPcBo7ivAexqQiOVG9YNnaQvcUh6kCg9yWaJJrXO9Ppg++AHDVm +8sVqdG5FvcDnzSSZwKIsIScUPMnxN+1LFCFXPNNeDYXYVNSv0tIkLjinxl9S/xgL +Ib/LMkCHA38ybiXV/uRP5XgGA1BCqkZHC0/g+QIDAQABAoIBAEpsFwcJNCwf95MG +qcK5lhCPaRQFgdTG68ylmoGUIXvddy3ies+W2X33oLb5958ElLaCRbRyBCJEKxgU +8vBWk50bF69uty9MLa6YuyaWO5QUyCX8I8KzVKh4/zIP81F2Z7xGwy5CzEKED+Xk +Hz6+xoHt094TuN34iaOV2gM/GJsok4Wp/lzsuT3X6i3Nad9YGrV2yL/wv5c542bw +vrFDtYQ/+ADZZPW4+xK0ShiarSqV3iXB2cEjc4JX7yLX1hB4LY8VHRzl+Byjdl0/ +lheiIesl5htl82SFxquZDimDsbilTm7TLW2bbm3b3/oC7DchTx6COBjp90VJqk3R +QrO5dicCgYEA80pyA7tCB0bGnJ7KWkteKddyOdakeYeM7Bpfv17qbCm9ciMw9nqt +KJVZPtAuqZGTpfSJseOCIyz9zloB79hVJ3mdWpGJVvmNM5H+BJyCciXpwfqp64QG +1gMqGlSy/MwsZHqNCsOIvrzH09GFN0LSPNKeXN7GNAtU1vI5s7Xf158CgYEA0U+Y +Qe1qJY4m597spHNFfkGznoFXAjHOoWYHv95902cH6JD4GnYPfwFXxgFsrJhFaFMC +jXlT0fRFAIe4NuUJhGD6TYSJqsFkH3xJkAepvKpfjM5qJ7+PQHRnED/E5OS2Nj0R ++cxBhTEWTw9YiOFBRbj6hlphkj8izVGJZ2pL4GcCgYEApsjiYKx/F33tqnExR7Vj +WEvagswi9S137mQmP4tSKdRzi0uUxWRUUP4RsH4HfzfNgHej7c+J55Nwa4ZIzaQA +vI8i0HP1MyrhIflzqrWgt6BGIDU3R7268fw5YNOv4J4X0Moy5q4lkJzaYNvB96BX +gFrjNceDGSqrfq+P3yNP0QECgYBNQfHTM8ygPA4EO/Zg5ONbrOidsuPovXWlgUGP +ApKy+y6iGxBYxAcIO/in71KrijDkRu+ERKo5rs3hWjcWnAedQyZggnFGA8fvDzMf +5JQ0PTazhGUOcthvVAfOqZsFWZ4f+v6tk0UD4pB3chSdwXcUQyjFeorVLlSsMFJl +R4jmNQKBgG38YFR2bqIc7jJItr+34POXdJ4te8Dm1jJHbo8xXsnjVSaxjc5PGs3p +OuJpwuMwzEuFEnE7XLkQxTJw54OBLMmDgK0XUOPDq6eLzrKkW5NlpejqaQV9Piyo +q1kqbJan20jfJQUGTcX7FXHMUThzqJltHILR1GTW6I9z4k8xdsDY +-----END RSA PRIVATE KEY----- diff --git a/tests/unit_tests/fixtures/bundle/assets/fonts/test_font.ttf b/tests/unit_tests/fixtures/bundle/assets/fonts/test_font.ttf new file mode 100644 index 0000000000..4066b0a988 Binary files /dev/null and b/tests/unit_tests/fixtures/bundle/assets/fonts/test_font.ttf differ diff --git a/tests/unit_tests/fixtures/bundle/assets/images/animation.gif b/tests/unit_tests/fixtures/bundle/assets/images/animation.gif new file mode 100644 index 0000000000..9932e77448 Binary files /dev/null and b/tests/unit_tests/fixtures/bundle/assets/images/animation.gif differ diff --git a/tests/unit_tests/fixtures/bundle/assets/images/logo.png b/tests/unit_tests/fixtures/bundle/assets/images/logo.png new file mode 100644 index 0000000000..bd2fd54783 Binary files /dev/null and b/tests/unit_tests/fixtures/bundle/assets/images/logo.png differ diff --git a/tests/unit_tests/fixtures/bundle/assets/web/custom.css b/tests/unit_tests/fixtures/bundle/assets/web/custom.css new file mode 100644 index 0000000000..992b81c80e --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/assets/web/custom.css @@ -0,0 +1,2 @@ +/* Dummy CSS for bundle testing */ +body { color: red; } diff --git a/tests/unit_tests/fixtures/bundle/assets/web/custom.js b/tests/unit_tests/fixtures/bundle/assets/web/custom.js new file mode 100644 index 0000000000..9be8a6b2dc --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/assets/web/custom.js @@ -0,0 +1,2 @@ +// Dummy JS for bundle testing +console.log("test"); diff --git a/tests/unit_tests/fixtures/bundle/bundle_test.yaml b/tests/unit_tests/fixtures/bundle/bundle_test.yaml new file mode 100644 index 0000000000..f834a8d867 --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/bundle_test.yaml @@ -0,0 +1,60 @@ +esphome: + name: bundle-test + includes: + - includes/custom_sensor.h + +esp32: + board: esp32dev + framework: + type: esp-idf + +logger: + <<: !include common/base.yaml + +wifi: + ssid: !secret wifi_ssid + password: !secret wifi_password + +api: + +ota: + - platform: esphome + password: !secret ota_password + +web_server: + port: 80 + css_include: assets/web/custom.css + js_include: assets/web/custom.js + +i2c: + sda: GPIO21 + scl: GPIO22 + +font: + - id: test_font + file: assets/fonts/test_font.ttf + size: 16 + +image: + - id: test_image + file: assets/images/logo.png + type: BINARY + resize: 16x16 + +animation: + - id: test_animation + file: assets/images/animation.gif + type: BINARY + resize: 16x16 + +display: + - platform: ssd1306_i2c + model: SSD1306_128X64 + address: 0x3C + lambda: |- + it.image(0, 0, id(test_image)); + +external_components: + - source: + type: local + path: local_components diff --git a/tests/unit_tests/fixtures/bundle/common/base.yaml b/tests/unit_tests/fixtures/bundle/common/base.yaml new file mode 100644 index 0000000000..58e1083e82 --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/common/base.yaml @@ -0,0 +1 @@ +level: DEBUG diff --git a/tests/unit_tests/fixtures/bundle/includes/custom_sensor.h b/tests/unit_tests/fixtures/bundle/includes/custom_sensor.h new file mode 100644 index 0000000000..7f0ff474ee --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/includes/custom_sensor.h @@ -0,0 +1,3 @@ +// Dummy custom sensor header for bundle testing +#pragma once +#include "esphome/core/component.h" diff --git a/tests/unit_tests/fixtures/bundle/local_components/my_component/__init__.py b/tests/unit_tests/fixtures/bundle/local_components/my_component/__init__.py new file mode 100644 index 0000000000..aa9fc1474b --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/local_components/my_component/__init__.py @@ -0,0 +1 @@ +# Dummy local external component for bundle testing diff --git a/tests/unit_tests/fixtures/bundle/local_components/my_component/my_component.h b/tests/unit_tests/fixtures/bundle/local_components/my_component/my_component.h new file mode 100644 index 0000000000..19b89ecc82 --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/local_components/my_component/my_component.h @@ -0,0 +1,2 @@ +// Dummy component header for bundle testing +#pragma once diff --git a/tests/unit_tests/fixtures/bundle/secrets.yaml b/tests/unit_tests/fixtures/bundle/secrets.yaml new file mode 100644 index 0000000000..47acddb4d9 --- /dev/null +++ b/tests/unit_tests/fixtures/bundle/secrets.yaml @@ -0,0 +1,4 @@ +wifi_ssid: "TestNetwork" +wifi_password: "TestPassword123" +api_key: "unused_secret_should_not_appear" +ota_password: "ota_test_password" diff --git a/tests/unit_tests/test_bundle.py b/tests/unit_tests/test_bundle.py new file mode 100644 index 0000000000..b8b2d0ffd1 --- /dev/null +++ b/tests/unit_tests/test_bundle.py @@ -0,0 +1,1210 @@ +"""Tests for esphome.bundle module.""" + +from __future__ import annotations + +import io +import json +from pathlib import Path +import tarfile +from typing import Any + +import pytest + +from esphome.bundle import ( + BUNDLE_EXTENSION, + CURRENT_MANIFEST_VERSION, + MANIFEST_FILENAME, + BundleManifest, + ConfigBundleCreator, + ManifestKey, + _add_bytes_to_tar, + _default_target_dir, + _find_used_secret_keys, + extract_bundle, + is_bundle_path, + prepare_bundle_for_compile, + read_bundle_manifest, +) +from esphome.core import CORE, EsphomeError + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_bundle( + tmp_path: Path, + config_filename: str = "test.yaml", + config_content: str = "esphome:\n name: test\n", + manifest_overrides: dict[str, Any] | None = None, + extra_files: dict[str, bytes] | None = None, + *, + include_manifest: bool = True, + raw_members: list[tarfile.TarInfo] | None = None, +) -> Path: + """Create a minimal bundle tar.gz for testing.""" + bundle_path = tmp_path / f"device{BUNDLE_EXTENSION}" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + if include_manifest: + manifest: dict[str, Any] = { + ManifestKey.MANIFEST_VERSION: CURRENT_MANIFEST_VERSION, + ManifestKey.ESPHOME_VERSION: "2026.2.0-test", + ManifestKey.CONFIG_FILENAME: config_filename, + ManifestKey.FILES: [config_filename], + ManifestKey.HAS_SECRETS: False, + } + if manifest_overrides: + manifest.update(manifest_overrides) + _add_bytes_to_tar(tar, MANIFEST_FILENAME, json.dumps(manifest).encode()) + + _add_bytes_to_tar(tar, config_filename, config_content.encode()) + + if extra_files: + for name, data in extra_files.items(): + _add_bytes_to_tar(tar, name, data) + + if raw_members: + for info in raw_members: + tar.addfile(info, io.BytesIO(b"")) + + bundle_path.write_bytes(buf.getvalue()) + return bundle_path + + +def _setup_config_dir( + tmp_path: Path, + files: dict[str, str] | None = None, +) -> Path: + """Set up a fake config directory with files and configure CORE.""" + config_dir = tmp_path / "config" + config_dir.mkdir() + + config_yaml = "esphome:\n name: test\n" + (config_dir / "test.yaml").write_text(config_yaml) + + if files: + for rel_path, content in files.items(): + p = config_dir / rel_path + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(content) + + CORE.config_path = config_dir / "test.yaml" + return config_dir + + +# --------------------------------------------------------------------------- +# is_bundle_path +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ("filename", "expected"), + [ + (f"my_device{BUNDLE_EXTENSION}", True), + (f"MY_DEVICE{BUNDLE_EXTENSION.upper()}", True), + ("my_device.yaml", False), + ("my_device.tar.gz", False), + ("my_device.zip", False), + ("", False), + ], +) +def test_is_bundle_path(filename: str, expected: bool) -> None: + assert is_bundle_path(Path(filename)) is expected + + +# --------------------------------------------------------------------------- +# _default_target_dir +# --------------------------------------------------------------------------- + + +def test_default_target_dir_strips_extension() -> None: + p = Path(f"/builds/device{BUNDLE_EXTENSION}") + result = _default_target_dir(p) + assert result == Path("/builds/device") + + +def test_default_target_dir_no_extension() -> None: + p = Path("/builds/device.other") + result = _default_target_dir(p) + assert result == Path("/builds/device.other") + + +# --------------------------------------------------------------------------- +# _find_used_secret_keys +# --------------------------------------------------------------------------- + + +def test_find_used_secret_keys(tmp_path: Path) -> None: + yaml1 = tmp_path / "a.yaml" + yaml1.write_text("wifi:\n ssid: !secret wifi_ssid\n password: !secret wifi_pw\n") + yaml2 = tmp_path / "b.yaml" + yaml2.write_text("api:\n key: !secret api_key\n") + + keys = _find_used_secret_keys([yaml1, yaml2]) + assert keys == {"wifi_ssid", "wifi_pw", "api_key"} + + +def test_find_used_secret_keys_no_secrets(tmp_path: Path) -> None: + yaml1 = tmp_path / "a.yaml" + yaml1.write_text("esphome:\n name: test\n") + + keys = _find_used_secret_keys([yaml1]) + assert keys == set() + + +def test_find_used_secret_keys_missing_file(tmp_path: Path) -> None: + missing = tmp_path / "does_not_exist.yaml" + keys = _find_used_secret_keys([missing]) + assert keys == set() + + +def test_find_used_secret_keys_deduplicates(tmp_path: Path) -> None: + yaml1 = tmp_path / "a.yaml" + yaml1.write_text("a: !secret key1\nb: !secret key1\n") + + keys = _find_used_secret_keys([yaml1]) + assert keys == {"key1"} + + +# --------------------------------------------------------------------------- +# _add_bytes_to_tar +# --------------------------------------------------------------------------- + + +def test_add_bytes_to_tar_deterministic_metadata() -> None: + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + _add_bytes_to_tar(tar, "hello.txt", b"world") + + buf.seek(0) + with tarfile.open(fileobj=buf, mode="r:gz") as tar: + member = tar.getmember("hello.txt") + assert member.size == 5 + assert member.mtime == 0 + assert member.uid == 0 + assert member.gid == 0 + assert member.mode == 0o644 + assert tar.extractfile(member).read() == b"world" + + +# --------------------------------------------------------------------------- +# ManifestKey +# --------------------------------------------------------------------------- + + +def test_manifest_key_values() -> None: + assert ManifestKey.MANIFEST_VERSION == "manifest_version" + assert ManifestKey.ESPHOME_VERSION == "esphome_version" + assert ManifestKey.CONFIG_FILENAME == "config_filename" + assert ManifestKey.FILES == "files" + assert ManifestKey.HAS_SECRETS == "has_secrets" + + +def test_manifest_key_is_str() -> None: + """Verify ManifestKey values work as dict keys and JSON keys.""" + d: dict[str, int] = {ManifestKey.MANIFEST_VERSION: 1} + assert d["manifest_version"] == 1 + + +# --------------------------------------------------------------------------- +# extract_bundle +# --------------------------------------------------------------------------- + + +def test_extract_bundle_basic(tmp_path: Path) -> None: + bundle_path = _make_bundle(tmp_path) + target = tmp_path / "output" + + config_path = extract_bundle(bundle_path, target) + + assert config_path.is_file() + assert config_path.name == "test.yaml" + assert config_path.read_text().startswith("esphome:") + assert (target / MANIFEST_FILENAME).is_file() + + +def test_extract_bundle_default_target_dir(tmp_path: Path) -> None: + bundle_path = _make_bundle(tmp_path) + + config_path = extract_bundle(bundle_path) + + expected_dir = tmp_path / "device" + assert config_path.parent == expected_dir + + +def test_extract_bundle_missing_file(tmp_path: Path) -> None: + missing = tmp_path / f"missing{BUNDLE_EXTENSION}" + with pytest.raises(EsphomeError, match="Bundle file not found"): + extract_bundle(missing) + + +def test_extract_bundle_missing_manifest(tmp_path: Path) -> None: + bundle_path = _make_bundle(tmp_path, include_manifest=False) + with pytest.raises(EsphomeError, match="missing manifest.json"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_future_manifest_version(tmp_path: Path) -> None: + bundle_path = _make_bundle( + tmp_path, + manifest_overrides={ManifestKey.MANIFEST_VERSION: 999}, + ) + with pytest.raises(EsphomeError, match="newer than this ESPHome"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_missing_config_filename_in_manifest(tmp_path: Path) -> None: + """Manifest exists but is missing config_filename key.""" + bundle_path = tmp_path / f"bad{BUNDLE_EXTENSION}" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + manifest = {ManifestKey.MANIFEST_VERSION: 1} + _add_bytes_to_tar(tar, MANIFEST_FILENAME, json.dumps(manifest).encode()) + _add_bytes_to_tar(tar, "test.yaml", b"esphome:\n name: test\n") + bundle_path.write_bytes(buf.getvalue()) + + with pytest.raises(EsphomeError, match="missing 'config_filename'"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_config_not_in_archive(tmp_path: Path) -> None: + """Manifest references a config file that isn't in the archive.""" + bundle_path = _make_bundle( + tmp_path, + config_filename="test.yaml", + manifest_overrides={ManifestKey.CONFIG_FILENAME: "missing.yaml"}, + ) + with pytest.raises(EsphomeError, match="was not found in the archive"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_with_extra_files(tmp_path: Path) -> None: + bundle_path = _make_bundle( + tmp_path, + extra_files={ + "common/base.yaml": b"level: DEBUG\n", + "includes/sensor.h": b"#pragma once\n", + }, + ) + target = tmp_path / "out" + extract_bundle(bundle_path, target) + + assert (target / "common" / "base.yaml").read_text() == "level: DEBUG\n" + assert (target / "includes" / "sensor.h").read_text() == "#pragma once\n" + + +# --------------------------------------------------------------------------- +# extract_bundle - security validation +# --------------------------------------------------------------------------- + + +def test_extract_bundle_rejects_absolute_path(tmp_path: Path) -> None: + info = tarfile.TarInfo(name="/etc/passwd") + info.size = 0 + bundle_path = _make_bundle(tmp_path, raw_members=[info]) + + with pytest.raises(EsphomeError, match="absolute path"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_rejects_path_traversal(tmp_path: Path) -> None: + info = tarfile.TarInfo(name="../../../etc/passwd") + info.size = 0 + bundle_path = _make_bundle(tmp_path, raw_members=[info]) + + with pytest.raises(EsphomeError, match="path traversal"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_rejects_backslash_path_traversal(tmp_path: Path) -> None: + info = tarfile.TarInfo(name="foo\\..\\..\\etc\\passwd") + info.size = 0 + bundle_path = _make_bundle(tmp_path, raw_members=[info]) + + with pytest.raises(EsphomeError, match="path traversal"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_rejects_symlink(tmp_path: Path) -> None: + info = tarfile.TarInfo(name="evil_link") + info.type = tarfile.SYMTYPE + info.linkname = "/etc/passwd" + info.size = 0 + bundle_path = _make_bundle(tmp_path, raw_members=[info]) + + with pytest.raises(EsphomeError, match="symlink"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_rejects_oversized( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Archive whose total decompressed size exceeds the limit is rejected.""" + # Lower the limit so we don't need huge test data + monkeypatch.setattr("esphome.bundle.MAX_DECOMPRESSED_SIZE", 100) + + bundle_path = _make_bundle( + tmp_path, + extra_files={"big.bin": b"\x00" * 200}, + ) + + with pytest.raises(EsphomeError, match="decompressed size exceeds"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_corrupted_tar(tmp_path: Path) -> None: + """Corrupted tar file raises EsphomeError.""" + bundle_path = tmp_path / f"bad{BUNDLE_EXTENSION}" + bundle_path.write_bytes(b"not a tar file at all") + + with pytest.raises(EsphomeError, match="Failed to extract bundle"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_malformed_manifest_json(tmp_path: Path) -> None: + """Invalid JSON in manifest.json raises EsphomeError.""" + bundle_path = tmp_path / f"badjson{BUNDLE_EXTENSION}" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + _add_bytes_to_tar(tar, MANIFEST_FILENAME, b"{invalid json") + _add_bytes_to_tar(tar, "test.yaml", b"esphome:\n name: test\n") + bundle_path.write_bytes(buf.getvalue()) + + with pytest.raises(EsphomeError, match="malformed manifest.json"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_missing_manifest_version(tmp_path: Path) -> None: + """Manifest without manifest_version raises EsphomeError.""" + bundle_path = tmp_path / f"nover{BUNDLE_EXTENSION}" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + manifest = {ManifestKey.CONFIG_FILENAME: "test.yaml"} + _add_bytes_to_tar(tar, MANIFEST_FILENAME, json.dumps(manifest).encode()) + _add_bytes_to_tar(tar, "test.yaml", b"esphome:\n name: test\n") + bundle_path.write_bytes(buf.getvalue()) + + with pytest.raises(EsphomeError, match="missing 'manifest_version'"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_invalid_manifest_version_type(tmp_path: Path) -> None: + """Non-integer manifest_version raises EsphomeError.""" + bundle_path = _make_bundle( + tmp_path, + manifest_overrides={ManifestKey.MANIFEST_VERSION: "not_an_int"}, + ) + + with pytest.raises(EsphomeError, match="must be a positive integer"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_manifest_version_zero(tmp_path: Path) -> None: + """manifest_version of 0 is rejected.""" + bundle_path = _make_bundle( + tmp_path, + manifest_overrides={ManifestKey.MANIFEST_VERSION: 0}, + ) + + with pytest.raises(EsphomeError, match="must be a positive integer"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_manifest_too_large( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Oversized manifest.json is rejected.""" + monkeypatch.setattr("esphome.bundle.MAX_MANIFEST_SIZE", 50) + + bundle_path = _make_bundle(tmp_path) + + with pytest.raises(EsphomeError, match="manifest.json too large"): + extract_bundle(bundle_path, tmp_path / "out") + + +def test_extract_bundle_manifest_not_regular_file(tmp_path: Path) -> None: + """manifest.json that is a directory entry raises EsphomeError.""" + bundle_path = tmp_path / f"dirmanifest{BUNDLE_EXTENSION}" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + # Add manifest.json as a directory instead of a file + dir_info = tarfile.TarInfo(name=MANIFEST_FILENAME) + dir_info.type = tarfile.DIRTYPE + dir_info.size = 0 + tar.addfile(dir_info) + _add_bytes_to_tar(tar, "test.yaml", b"esphome:\n name: test\n") + bundle_path.write_bytes(buf.getvalue()) + + with pytest.raises(EsphomeError, match="not a regular file"): + extract_bundle(bundle_path, tmp_path / "out") + + +# --------------------------------------------------------------------------- +# read_bundle_manifest +# --------------------------------------------------------------------------- + + +def test_read_bundle_manifest_corrupted_tar(tmp_path: Path) -> None: + """Corrupted tar file raises EsphomeError via read_bundle_manifest.""" + bundle_path = tmp_path / f"bad{BUNDLE_EXTENSION}" + bundle_path.write_bytes(b"not a tar file") + + with pytest.raises(EsphomeError, match="Failed to read bundle"): + read_bundle_manifest(bundle_path) + + +def test_read_bundle_manifest(tmp_path: Path) -> None: + bundle_path = _make_bundle( + tmp_path, + manifest_overrides={ManifestKey.HAS_SECRETS: True}, + extra_files={"secrets.yaml": b"wifi: test\n"}, + ) + + manifest = read_bundle_manifest(bundle_path) + + assert isinstance(manifest, BundleManifest) + assert manifest.manifest_version == CURRENT_MANIFEST_VERSION + assert manifest.esphome_version == "2026.2.0-test" + assert manifest.config_filename == "test.yaml" + assert manifest.has_secrets is True + + +def test_read_bundle_manifest_minimal(tmp_path: Path) -> None: + """Manifest with only required fields.""" + bundle_path = tmp_path / f"min{BUNDLE_EXTENSION}" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + manifest = { + ManifestKey.MANIFEST_VERSION: 1, + ManifestKey.CONFIG_FILENAME: "cfg.yaml", + } + _add_bytes_to_tar(tar, MANIFEST_FILENAME, json.dumps(manifest).encode()) + _add_bytes_to_tar(tar, "cfg.yaml", b"") + bundle_path.write_bytes(buf.getvalue()) + + result = read_bundle_manifest(bundle_path) + assert result.esphome_version == "unknown" + assert result.files == [] + assert result.has_secrets is False + + +# --------------------------------------------------------------------------- +# prepare_bundle_for_compile +# --------------------------------------------------------------------------- + + +def test_prepare_bundle_preserves_build_cache(tmp_path: Path) -> None: + bundle_path = _make_bundle(tmp_path) + target = tmp_path / "work" + target.mkdir() + + # Pre-existing build cache + esphome_dir = target / ".esphome" + esphome_dir.mkdir() + (esphome_dir / "build_state.json").write_text('{"cached": true}') + + pio_dir = target / ".pioenvs" + pio_dir.mkdir() + (pio_dir / "firmware.bin").write_bytes(b"\x00" * 100) + + config_path = prepare_bundle_for_compile(bundle_path, target) + + assert config_path.is_file() + # Build caches should be preserved + assert (target / ".esphome" / "build_state.json").read_text() == '{"cached": true}' + assert (target / ".pioenvs" / "firmware.bin").read_bytes() == b"\x00" * 100 + + +def test_prepare_bundle_cleans_old_config(tmp_path: Path) -> None: + bundle_path = _make_bundle(tmp_path) + target = tmp_path / "work" + target.mkdir() + + # Old config from previous extraction + (target / "old_config.yaml").write_text("old: true") + old_dir = target / "old_includes" + old_dir.mkdir() + (old_dir / "old.h").write_text("// old") + + prepare_bundle_for_compile(bundle_path, target) + + # Old files should be cleaned + assert not (target / "old_config.yaml").exists() + assert not (target / "old_includes").exists() + # New config should exist + assert (target / "test.yaml").is_file() + + +def test_prepare_bundle_missing_file(tmp_path: Path) -> None: + missing = tmp_path / f"missing{BUNDLE_EXTENSION}" + with pytest.raises(EsphomeError, match="Bundle file not found"): + prepare_bundle_for_compile(missing) + + +def test_prepare_bundle_cache_wins_over_bundle_content(tmp_path: Path) -> None: + """Pre-existing build cache is restored even if the bundle contains those dirs.""" + bundle_path = _make_bundle( + tmp_path, + extra_files={ + ".esphome/from_bundle.json": b'{"from": "bundle"}', + }, + ) + target = tmp_path / "work" + target.mkdir() + + # Pre-existing build cache + esphome_dir = target / ".esphome" + esphome_dir.mkdir() + (esphome_dir / "local_cache.json").write_text('{"from": "local"}') + + prepare_bundle_for_compile(bundle_path, target) + + # Local cache should win over bundle content + assert (target / ".esphome" / "local_cache.json").read_text() == '{"from": "local"}' + assert not (target / ".esphome" / "from_bundle.json").exists() + + +def test_prepare_bundle_default_target_dir(tmp_path: Path) -> None: + """prepare_bundle_for_compile uses default dir when target_dir is None.""" + bundle_path = _make_bundle(tmp_path) + + config_path = prepare_bundle_for_compile(bundle_path) + + expected_dir = tmp_path / "device" + assert config_path.parent == expected_dir + assert config_path.is_file() + + +# --------------------------------------------------------------------------- +# ConfigBundleCreator - file discovery +# --------------------------------------------------------------------------- + + +def test_discover_files_includes_config(tmp_path: Path) -> None: + _setup_config_dir(tmp_path) + + creator = ConfigBundleCreator({}) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "test.yaml" in paths + + +def test_discover_files_finds_path_objects(tmp_path: Path) -> None: + """Path objects in validated config are discovered.""" + config_dir = _setup_config_dir( + tmp_path, + files={"assets/font.ttf": "fake font data"}, + ) + + config: dict[str, Any] = {"font": [{"file": config_dir / "assets" / "font.ttf"}]} + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "assets/font.ttf" in paths + + +def test_discover_files_finds_absolute_string_paths(tmp_path: Path) -> None: + """Absolute string paths in validated config are discovered.""" + config_dir = _setup_config_dir( + tmp_path, + files={"assets/logo.png": "fake png data"}, + ) + + abs_path = str(config_dir / "assets" / "logo.png") + config: dict[str, Any] = {"image": [{"file": abs_path}]} + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "assets/logo.png" in paths + + +def test_discover_files_skips_non_path_prefixes(tmp_path: Path) -> None: + """Remote URLs and special prefixes are not treated as file paths.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "font": [ + {"file": "https://example.com/font.ttf"}, + {"file": "mdi:home"}, + {"file": "http://example.com/icon.png"}, + ] + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + # Only the config file itself + assert len(files) == 1 + assert files[0].path == "test.yaml" + + +def test_discover_files_skips_multiline_strings(tmp_path: Path) -> None: + """Lambda/template strings are not treated as file paths.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "sensor": [{"lambda": "auto val = id(sensor1);\nreturn val;"}] + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + assert len(files) == 1 + + +def test_discover_files_deduplicates(tmp_path: Path) -> None: + """Same file referenced twice is only included once.""" + config_dir = _setup_config_dir( + tmp_path, + files={"cert.pem": "fake cert"}, + ) + + abs_path = str(config_dir / "cert.pem") + config: dict[str, Any] = { + "a": {"cert": abs_path}, + "b": {"cert": abs_path}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + cert_files = [f for f in files if f.path == "cert.pem"] + assert len(cert_files) == 1 + + +def test_discover_files_skips_outside_config_dir(tmp_path: Path) -> None: + """Files outside the config directory are skipped.""" + _setup_config_dir(tmp_path) + + outside_file = tmp_path / "outside.pem" + outside_file.write_text("outside cert") + + config: dict[str, Any] = {"cert": str(outside_file)} + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "outside.pem" not in paths + + +def test_discover_files_esphome_includes(tmp_path: Path) -> None: + """Paths listed in esphome.includes are discovered.""" + _setup_config_dir( + tmp_path, + files={"my_sensor.h": "#pragma once\n"}, + ) + + config: dict[str, Any] = { + "esphome": {"includes": ["my_sensor.h"]}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "my_sensor.h" in paths + + +def test_discover_files_esphome_includes_directory(tmp_path: Path) -> None: + """esphome.includes pointing to a directory adds all files.""" + _setup_config_dir( + tmp_path, + files={ + "my_lib/a.h": "// a", + "my_lib/b.cpp": "// b", + }, + ) + + config: dict[str, Any] = { + "esphome": {"includes": ["my_lib"]}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "my_lib/a.h" in paths + assert "my_lib/b.cpp" in paths + + +def test_discover_files_esphome_includes_skips_system(tmp_path: Path) -> None: + """System includes like are not added.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "esphome": {"includes": [""]}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert len(paths) == 1 # Just test.yaml + + +def test_discover_files_external_components_local(tmp_path: Path) -> None: + """external_components with type: local adds the directory.""" + _setup_config_dir( + tmp_path, + files={ + "components/my_comp/__init__.py": "# comp", + "components/my_comp/sensor.py": "# sensor", + }, + ) + + config: dict[str, Any] = { + "external_components": [{"source": {"type": "local", "path": "components"}}], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "components/my_comp/__init__.py" in paths + assert "components/my_comp/sensor.py" in paths + + +def test_discover_files_external_components_skips_pycache(tmp_path: Path) -> None: + """__pycache__ directories inside local external_components are excluded.""" + _setup_config_dir( + tmp_path, + files={ + "components/my_comp/__init__.py": "# comp", + "components/my_comp/__pycache__/module.cpython-313.pyc": "bytecode", + }, + ) + + config: dict[str, Any] = { + "external_components": [{"source": {"type": "local", "path": "components"}}], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "components/my_comp/__init__.py" in paths + assert not any("__pycache__" in p for p in paths) + + +def test_discover_files_external_components_non_dict_source(tmp_path: Path) -> None: + """external_components with string source (e.g. github shorthand) is skipped.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "external_components": [{"source": "github://user/repo@main"}], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + # Only the config file itself - no crash from non-dict source + assert len(files) == 1 + assert files[0].path == "test.yaml" + + +def test_discover_files_nested_config_values(tmp_path: Path) -> None: + """Deeply nested Path objects in lists/dicts are found.""" + config_dir = _setup_config_dir( + tmp_path, + files={"deep/file.pem": "cert data"}, + ) + + config: dict[str, Any] = { + "level1": {"level2": [{"level3": config_dir / "deep" / "file.pem"}]} + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "deep/file.pem" in paths + + +def test_discover_files_idempotent_secrets(tmp_path: Path) -> None: + """Calling discover_files twice does not accumulate secrets paths.""" + config_dir = _setup_config_dir(tmp_path) + (config_dir / "secrets.yaml").write_text("k: v\n") + (config_dir / "test.yaml").write_text("a: !secret k\n") + + creator = ConfigBundleCreator({}) + files1 = creator.discover_files() + files2 = creator.discover_files() + + # Both calls should return the same result (secrets not accumulated) + paths1 = [f.path for f in files1] + paths2 = [f.path for f in files2] + assert "secrets.yaml" in paths1 + assert paths1 == paths2 + + +def test_discover_files_skips_missing_file(tmp_path: Path) -> None: + """_add_file logs warning for non-existent files via includes.""" + _setup_config_dir(tmp_path) + + # Include references a file that doesn't exist on disk + config: dict[str, Any] = { + "esphome": {"includes": ["nonexistent.h"]}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "nonexistent.h" not in paths + + +def test_discover_files_skips_missing_directory(tmp_path: Path) -> None: + """_add_directory logs warning for non-existent directories.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "external_components": [ + {"source": {"type": "local", "path": "nonexistent_dir"}} + ], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + # Only the config file + assert len(files) == 1 + + +def test_discover_files_yaml_reload_failure( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """YAML reload failure during include discovery is handled gracefully.""" + _setup_config_dir(tmp_path) + + def _raise_error(*args, **kwargs): + raise EsphomeError("parse error") + + monkeypatch.setattr("esphome.yaml_util.load_yaml", _raise_error) + + creator = ConfigBundleCreator({}) + files = creator.discover_files() + + # Should still have the config file at minimum + paths = [f.path for f in files] + assert "test.yaml" in paths + + +def test_discover_files_esphome_includes_c(tmp_path: Path) -> None: + """Paths listed in esphome.includes_c are discovered.""" + _setup_config_dir( + tmp_path, + files={"my_code.c": "// c code"}, + ) + + config: dict[str, Any] = { + "esphome": {"includes_c": ["my_code.c"]}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "my_code.c" in paths + + +def test_discover_files_external_components_non_local_type(tmp_path: Path) -> None: + """external_components with type != 'local' are skipped.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "external_components": [ + {"source": {"type": "git", "url": "https://github.com/user/repo"}} + ], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + assert len(files) == 1 + + +def test_discover_files_external_components_no_path(tmp_path: Path) -> None: + """external_components with local type but missing path are skipped.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "external_components": [{"source": {"type": "local"}}], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + assert len(files) == 1 + + +def test_discover_files_external_components_absolute_path(tmp_path: Path) -> None: + """external_components with absolute path are resolved correctly.""" + config_dir = _setup_config_dir( + tmp_path, + files={"ext/comp/__init__.py": "# comp"}, + ) + + abs_path = str(config_dir / "ext") + config: dict[str, Any] = { + "external_components": [{"source": {"type": "local", "path": abs_path}}], + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "ext/comp/__init__.py" in paths + + +def test_discover_files_relative_string_with_known_extension(tmp_path: Path) -> None: + """Relative strings with known extensions are resolved and warned.""" + _setup_config_dir( + tmp_path, + files={"my_cert.pem": "cert data"}, + ) + + config: dict[str, Any] = { + "component": {"cert": "my_cert.pem"}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "my_cert.pem" in paths + + +def test_discover_files_relative_string_missing_file(tmp_path: Path) -> None: + """Relative strings with known extensions that don't exist are skipped.""" + _setup_config_dir(tmp_path) + + config: dict[str, Any] = { + "component": {"cert": "nonexistent.pem"}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + assert len(files) == 1 + + +def test_discover_files_esphome_includes_absolute_path(tmp_path: Path) -> None: + """esphome.includes with absolute path is handled.""" + config_dir = _setup_config_dir( + tmp_path, + files={"my_code.h": "#pragma once"}, + ) + + config: dict[str, Any] = { + "esphome": {"includes": [str(config_dir / "my_code.h")]}, + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "my_code.h" in paths + + +def test_discover_files_walk_tuple_values(tmp_path: Path) -> None: + """Tuples in config are walked like lists.""" + config_dir = _setup_config_dir( + tmp_path, + files={"a.pem": "cert"}, + ) + + config: dict[str, Any] = { + "items": (config_dir / "a.pem",), + } + creator = ConfigBundleCreator(config) + files = creator.discover_files() + + paths = [f.path for f in files] + assert "a.pem" in paths + + +# --------------------------------------------------------------------------- +# ConfigBundleCreator - create_bundle +# --------------------------------------------------------------------------- + + +def test_create_bundle_produces_valid_archive(tmp_path: Path) -> None: + _setup_config_dir(tmp_path) + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + assert isinstance(result.data, bytes) + assert len(result.data) > 0 + + # Verify it's a valid tar.gz + buf = io.BytesIO(result.data) + with tarfile.open(fileobj=buf, mode="r:gz") as tar: + names = tar.getnames() + assert MANIFEST_FILENAME in names + assert "test.yaml" in names + + +def test_create_bundle_manifest_content(tmp_path: Path) -> None: + _setup_config_dir(tmp_path) + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + manifest = result.manifest + assert manifest[ManifestKey.MANIFEST_VERSION] == CURRENT_MANIFEST_VERSION + assert manifest[ManifestKey.CONFIG_FILENAME] == "test.yaml" + assert "test.yaml" in manifest[ManifestKey.FILES] + + +def test_create_bundle_filters_secrets(tmp_path: Path) -> None: + config_dir = _setup_config_dir(tmp_path) + + # Create secrets.yaml with multiple secrets + secrets = config_dir / "secrets.yaml" + secrets.write_text( + "wifi_ssid: MyNetwork\nwifi_pw: secret123\nunused: should_not_appear\n" + ) + + # Config that references only some secrets + config_yaml = "wifi:\n ssid: !secret wifi_ssid\n password: !secret wifi_pw\n" + (config_dir / "test.yaml").write_text(config_yaml) + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + # Extract and check secrets + buf = io.BytesIO(result.data) + with tarfile.open(fileobj=buf, mode="r:gz") as tar: + secrets_data = tar.extractfile("secrets.yaml").read().decode() + + assert "wifi_ssid" in secrets_data + assert "wifi_pw" in secrets_data + assert "unused" not in secrets_data + assert "should_not_appear" not in secrets_data + + +def test_create_bundle_no_secrets(tmp_path: Path) -> None: + _setup_config_dir(tmp_path) + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + assert result.manifest[ManifestKey.HAS_SECRETS] is False + + +def test_create_bundle_secrets_load_failure( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Secrets file that fails to load during filtering is skipped gracefully.""" + config_dir = _setup_config_dir(tmp_path) + (config_dir / "secrets.yaml").write_text("k: v\n") + (config_dir / "test.yaml").write_text("a: !secret k\n") + + from esphome import yaml_util as yu + + original_load = yu.load_yaml + + def _failing_on_filter(fname, *args, clear_secrets=True, **kwargs): + # Fail only when _build_filtered_secrets calls with clear_secrets=False + if not clear_secrets and "secrets" in str(fname): + raise EsphomeError("corrupt secrets") + return original_load(fname, *args, clear_secrets=clear_secrets, **kwargs) + + monkeypatch.setattr(yu, "load_yaml", _failing_on_filter) + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + # Should succeed without secrets since the filtered load failed + assert result.manifest[ManifestKey.HAS_SECRETS] is False + + +def test_create_bundle_secrets_non_dict(tmp_path: Path) -> None: + """Secrets file that parses to non-dict is skipped.""" + config_dir = _setup_config_dir(tmp_path) + (config_dir / "secrets.yaml").write_text("- item1\n- item2\n") + (config_dir / "test.yaml").write_text("a: !secret k\n") + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + assert result.manifest[ManifestKey.HAS_SECRETS] is False + + +def test_create_bundle_secrets_no_matching_keys(tmp_path: Path) -> None: + """Secrets with no matching keys produces empty filtered result.""" + config_dir = _setup_config_dir(tmp_path) + (config_dir / "secrets.yaml").write_text("other_key: value\n") + (config_dir / "test.yaml").write_text("a: !secret nonexistent\n") + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + assert result.manifest[ManifestKey.HAS_SECRETS] is False + + +def test_create_bundle_deterministic_order(tmp_path: Path) -> None: + """Files are added in sorted order for reproducibility.""" + _setup_config_dir( + tmp_path, + files={ + "z_last.h": "// z", + "a_first.h": "// a", + "m_middle.h": "// m", + }, + ) + + config: dict[str, Any] = { + "esphome": {"includes": ["z_last.h", "a_first.h", "m_middle.h"]}, + } + creator = ConfigBundleCreator(config) + result = creator.create_bundle() + + buf = io.BytesIO(result.data) + with tarfile.open(fileobj=buf, mode="r:gz") as tar: + names = tar.getnames() + + # manifest.json is always first, then files in sorted order + assert names[0] == MANIFEST_FILENAME + file_names = [n for n in names if n != MANIFEST_FILENAME] + assert file_names == sorted(file_names) + + +# --------------------------------------------------------------------------- +# Round-trip: create then extract +# --------------------------------------------------------------------------- + + +def test_bundle_round_trip(tmp_path: Path) -> None: + """A bundle created by ConfigBundleCreator can be extracted.""" + _setup_config_dir( + tmp_path, + files={"include.h": "#pragma once\n"}, + ) + config: dict[str, Any] = {"esphome": {"includes": ["include.h"]}} + + creator = ConfigBundleCreator(config) + result = creator.create_bundle() + + bundle_path = tmp_path / f"roundtrip{BUNDLE_EXTENSION}" + bundle_path.write_bytes(result.data) + + target = tmp_path / "extracted" + config_path = extract_bundle(bundle_path, target) + + assert config_path.is_file() + assert (target / "include.h").read_text() == "#pragma once\n" + + manifest = read_bundle_manifest(bundle_path) + assert manifest.config_filename == "test.yaml" + assert "include.h" in manifest.files + + +def test_bundle_round_trip_with_secrets(tmp_path: Path) -> None: + """Secrets survive round-trip with correct filtering.""" + config_dir = _setup_config_dir(tmp_path) + (config_dir / "secrets.yaml").write_text("key1: val1\nkey2: val2\nunused: nope\n") + (config_dir / "test.yaml").write_text("a: !secret key1\nb: !secret key2\n") + + creator = ConfigBundleCreator({}) + result = creator.create_bundle() + + bundle_path = tmp_path / f"secrets{BUNDLE_EXTENSION}" + bundle_path.write_bytes(result.data) + + target = tmp_path / "extracted" + extract_bundle(bundle_path, target) + + secrets_content = (target / "secrets.yaml").read_text() + assert "key1" in secrets_content + assert "key2" in secrets_content + assert "unused" not in secrets_content + + manifest = read_bundle_manifest(bundle_path) + assert manifest.has_secrets is True diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index 115ce38c93..85536d2f1c 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -24,6 +24,7 @@ from esphome.__main__ import ( _make_crystal_freq_callback, choose_upload_log_host, command_analyze_memory, + command_bundle, command_clean_all, command_rename, command_update_all, @@ -47,6 +48,7 @@ from esphome.__main__ import ( upload_using_picotool, upload_using_platformio, ) +from esphome.bundle import BUNDLE_EXTENSION, BundleFile, BundleResult from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32 from esphome.const import ( CONF_API, @@ -1101,6 +1103,8 @@ class MockArgs: name: str | None = None dashboard: bool = False reset: bool = False + list_only: bool = False + output: str | None = None def test_upload_program_serial_esp32( @@ -3765,6 +3769,198 @@ esp32: assert "secrets.yaml" not in summary_section +# --- command_bundle tests --- + + +def test_command_bundle_list_only( + tmp_path: Path, + capsys: CaptureFixture[str], +) -> None: + """Test command_bundle with --list-only prints files and returns 0.""" + mock_files = [ + BundleFile(path="device.yaml", source=tmp_path / "device.yaml"), + BundleFile(path="secrets.yaml", source=tmp_path / "secrets.yaml"), + BundleFile(path="common/base.yaml", source=tmp_path / "common" / "base.yaml"), + ] + + args = MockArgs(list_only=True) + config: dict[str, Any] = {} + + mock_creator = MagicMock() + mock_creator.discover_files.return_value = mock_files + + with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator): + result = command_bundle(args, config) + + assert result == 0 + captured = capsys.readouterr() + # Files should be printed in sorted order + assert "common/base.yaml" in captured.out + assert "device.yaml" in captured.out + assert "secrets.yaml" in captured.out + + +def test_command_bundle_list_only_empty( + tmp_path: Path, + capsys: CaptureFixture[str], +) -> None: + """Test command_bundle --list-only with no files discovered.""" + args = MockArgs(list_only=True) + config: dict[str, Any] = {} + + mock_creator = MagicMock() + mock_creator.discover_files.return_value = [] + + with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator): + result = command_bundle(args, config) + + assert result == 0 + + +def test_command_bundle_creates_archive(tmp_path: Path) -> None: + """Test command_bundle creates archive at default output path.""" + CORE.config_path = tmp_path / "mydevice.yaml" + + mock_result = BundleResult( + data=b"fake-tar-gz-data", + manifest={"manifest_version": 1}, + files=[BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml")], + ) + + args = MockArgs() + config: dict[str, Any] = {} + + mock_creator = MagicMock() + mock_creator.create_bundle.return_value = mock_result + + with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator): + result = command_bundle(args, config) + + assert result == 0 + output_path = tmp_path / f"mydevice{BUNDLE_EXTENSION}" + assert output_path.exists() + assert output_path.read_bytes() == b"fake-tar-gz-data" + + +def test_command_bundle_custom_output(tmp_path: Path) -> None: + """Test command_bundle with -o custom output path.""" + custom_output = tmp_path / "output" / "custom.esphomebundle.tar.gz" + mock_result = BundleResult( + data=b"custom-output-data", + manifest={"manifest_version": 1}, + files=[BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml")], + ) + + args = MockArgs(output=str(custom_output)) + config: dict[str, Any] = {} + + mock_creator = MagicMock() + mock_creator.create_bundle.return_value = mock_result + + with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator): + result = command_bundle(args, config) + + assert result == 0 + assert custom_output.exists() + assert custom_output.read_bytes() == b"custom-output-data" + + +def test_command_bundle_creates_parent_dirs(tmp_path: Path) -> None: + """Test command_bundle creates parent directories for output path.""" + nested_output = tmp_path / "deep" / "nested" / "dir" / "out.tar.gz" + mock_result = BundleResult( + data=b"data", + manifest={"manifest_version": 1}, + files=[BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml")], + ) + + args = MockArgs(output=str(nested_output)) + config: dict[str, Any] = {} + + mock_creator = MagicMock() + mock_creator.create_bundle.return_value = mock_result + + with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator): + result = command_bundle(args, config) + + assert result == 0 + assert nested_output.exists() + + +def test_command_bundle_logs_info( + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test command_bundle logs bundle creation info.""" + CORE.config_path = tmp_path / "mydevice.yaml" + + mock_result = BundleResult( + data=b"x" * 2048, + manifest={"manifest_version": 1}, + files=[ + BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml"), + BundleFile(path="secrets.yaml", source=tmp_path / "secrets.yaml"), + ], + ) + + args = MockArgs() + config: dict[str, Any] = {} + + mock_creator = MagicMock() + mock_creator.create_bundle.return_value = mock_result + + with ( + patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator), + caplog.at_level(logging.INFO), + ): + result = command_bundle(args, config) + + assert result == 0 + assert "Bundle created" in caplog.text + assert "2 files" in caplog.text + assert "2.0 KB" in caplog.text + + +def test_run_esphome_bundle_detection(tmp_path: Path) -> None: + """Test run_esphome detects .esphomebundle.tar.gz and extracts it.""" + bundle_path = tmp_path / f"device{BUNDLE_EXTENSION}" + bundle_path.write_bytes(b"fake-bundle") + + extracted_yaml = tmp_path / "extracted" / "device.yaml" + + with ( + patch("esphome.bundle.is_bundle_path", return_value=True) as mock_is_bundle, + patch( + "esphome.bundle.prepare_bundle_for_compile", + return_value=extracted_yaml, + ) as mock_prepare, + patch("esphome.__main__.read_config", return_value=None), + ): + result = run_esphome(["esphome", "compile", str(bundle_path)]) + + mock_is_bundle.assert_called_once() + mock_prepare.assert_called_once_with(bundle_path) + # read_config returns None → exit code 2 + assert result == 2 + + +def test_run_esphome_non_bundle_skips_extraction(tmp_path: Path) -> None: + """Test run_esphome does not extract for regular .yaml files.""" + yaml_file = tmp_path / "device.yaml" + yaml_file.write_text("esphome:\n name: test\n") + + with ( + patch("esphome.bundle.is_bundle_path", return_value=False) as mock_is_bundle, + patch("esphome.bundle.prepare_bundle_for_compile") as mock_prepare, + patch("esphome.__main__.read_config", return_value=None), + ): + result = run_esphome(["esphome", "compile", str(yaml_file)]) + + mock_is_bundle.assert_called_once() + mock_prepare.assert_not_called() + assert result == 2 + + def test_get_configured_xtal_freq_reads_sdkconfig(tmp_path: Path) -> None: """Test reading XTAL_FREQ from sdkconfig.""" CORE.name = "test-device" diff --git a/tests/unit_tests/test_yaml_util.py b/tests/unit_tests/test_yaml_util.py index 667b593819..0342d12540 100644 --- a/tests/unit_tests/test_yaml_util.py +++ b/tests/unit_tests/test_yaml_util.py @@ -323,6 +323,60 @@ def test_dump_sort_keys() -> None: assert sorted_dump.index("a_key:") < sorted_dump.index("z_key:") +# --------------------------------------------------------------------------- +# track_yaml_loads +# --------------------------------------------------------------------------- + + +def test_track_yaml_loads_records_files(tmp_path: Path) -> None: + """track_yaml_loads records every file loaded inside the context.""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text("key: value\n") + + with yaml_util.track_yaml_loads() as loaded: + yaml_util.load_yaml(yaml_file) + + assert len(loaded) == 1 + assert loaded[0] == yaml_file.resolve() + + +def test_track_yaml_loads_records_includes(tmp_path: Path) -> None: + """track_yaml_loads records nested !include files.""" + inc = tmp_path / "included.yaml" + inc.write_text("included_key: 42\n") + main = tmp_path / "main.yaml" + main.write_text("child: !include included.yaml\n") + + with yaml_util.track_yaml_loads() as loaded: + yaml_util.load_yaml(main) + + resolved = [p.name for p in loaded] + assert "main.yaml" in resolved + assert "included.yaml" in resolved + + +def test_track_yaml_loads_empty_outside_context(tmp_path: Path) -> None: + """Files loaded outside the context are not recorded.""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text("key: value\n") + + with yaml_util.track_yaml_loads() as loaded: + pass # load nothing inside + + yaml_util.load_yaml(yaml_file) + assert loaded == [] + + +def test_track_yaml_loads_cleanup_on_exception(tmp_path: Path) -> None: + """Listener is removed even if the body raises.""" + before = len(yaml_util._load_listeners) + + with pytest.raises(RuntimeError), yaml_util.track_yaml_loads(): + raise RuntimeError("boom") + + assert len(yaml_util._load_listeners) == before + + @pytest.mark.parametrize( "data", [