[nrf52] native build - download toolchain and sdk in venv (#16388)

Co-authored-by: Jonathan Swoboda <154711427+swoboda1337@users.noreply.github.com>
Co-authored-by: Jonathan Swoboda <swoboda1337@users.noreply.github.com>
This commit is contained in:
tomaszduda23
2026-06-09 13:04:51 +02:00
committed by GitHub
parent 25d656d468
commit 5faed9d5f5
12 changed files with 2624 additions and 565 deletions

View File

@@ -894,6 +894,13 @@ class TestEsphomeCore:
"foo/build/.pioenvs/test-device/bootloader.bin"
)
def test_using_toolchain_sdk_nrf(self, target):
"""using_toolchain_sdk_nrf is True only for the SDK_NRF toolchain."""
target.toolchain = const.Toolchain.SDK_NRF
assert target.using_toolchain_sdk_nrf is True
target.toolchain = const.Toolchain.ESP_IDF
assert target.using_toolchain_sdk_nrf is False
def test_add_library__extracts_short_name_from_path(self, target):
"""Test add_library extracts short name from library paths like owner/lib."""
target.data[const.KEY_CORE] = {

View File

@@ -2,12 +2,32 @@
# pylint: disable=protected-access
import io
import json
from pathlib import Path
import tarfile
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from esphome.espidf.framework import _clone_idf_with_submodules, _parse_git_source
from esphome.espidf.framework import (
_check_stamp,
_clone_idf_with_submodules,
_get_framework_path,
_get_idf_tool_paths,
_get_idf_tools_path,
_get_idf_version,
_get_python_env_path,
_get_python_version,
_parse_git_source,
_patch_tools_json_for_linux_arm64,
_write_idf_version_txt,
_write_stamp,
check_esp_idf_install,
get_framework_env,
)
from esphome.framework_helpers import _tar_extract_all, get_python_env_executable_path
@pytest.mark.parametrize(
@@ -154,3 +174,511 @@ def test_clone_idf_with_submodules_raises_when_tree_missing(
"https://github.com/espressif/esp-idf.git",
None,
)
# ---------------------------------------------------------------------------
# Helpers for _tar_extract_all hard-link prefix-stripping tests
# ---------------------------------------------------------------------------
def _make_tar(
members: list[tarfile.TarInfo], file_contents: dict[str, bytes]
) -> io.BytesIO:
"""Build an in-memory tar archive from a list of TarInfo objects."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
for info in members:
if info.isreg() and info.name in file_contents:
data = file_contents[info.name]
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
else:
tf.addfile(info)
buf.seek(0)
return buf
def _regular(name: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.REGTYPE
info.size = 0
info.mode = 0o644
return info
def _hardlink(name: str, linkname: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.LNKTYPE
info.linkname = linkname
info.size = 0
info.mode = 0o644
return info
class TestTarExtractHardLinkPrefixStripping:
"""
Covers the hard-link prefix-stripping block in _tar_extract_all (L528-541).
Archive layout used by every test:
wrapper/ ← single top-level wrapper dir (stripped)
wrapper/target.txt ← regular file; becomes target.txt in dest
wrapper/link_good ← hard link to wrapper/target.txt (kept, linkname stripped)
wrapper/link_exact_root ← hard link to "wrapper" (skipped equals strip_root)
wrapper/link_exact_prefix ← hard link to "wrapper/" (skipped equals strip_prefix)
wrapper/link_outside ← hard link to "other/target.txt" (skipped not under prefix)
"""
WRAPPER = "wrapper"
def _build_archive(self) -> io.BytesIO:
members = [
_regular(f"{self.WRAPPER}/"),
_regular(f"{self.WRAPPER}/target.txt"),
_hardlink(f"{self.WRAPPER}/link_good", f"{self.WRAPPER}/target.txt"),
_hardlink(f"{self.WRAPPER}/link_exact_root", self.WRAPPER),
_hardlink(f"{self.WRAPPER}/link_exact_prefix", f"{self.WRAPPER}/"),
_hardlink(f"{self.WRAPPER}/link_outside", "other/target.txt"),
]
return _make_tar(members, {f"{self.WRAPPER}/target.txt": b"hello"})
def test_good_hardlink_is_extracted_with_stripped_linkname(
self, tmp_path: Path
) -> None:
"""Hard link whose linkname starts with wrapper/ is extracted and its
linkname has the prefix removed so tarfile can resolve the target."""
_tar_extract_all(self._build_archive(), tmp_path)
link = tmp_path / "link_good"
assert link.exists(), "link_good should have been extracted"
assert link.read_bytes() == b"hello"
def test_hardlink_equal_to_strip_root_is_skipped(self, tmp_path: Path) -> None:
"""Hard link whose linkname equals strip_root exactly must be dropped."""
_tar_extract_all(self._build_archive(), tmp_path)
assert not (tmp_path / "link_exact_root").exists()
def test_hardlink_equal_to_strip_prefix_is_skipped(self, tmp_path: Path) -> None:
"""Hard link whose linkname equals strip_prefix (strip_root + '/') must be dropped."""
_tar_extract_all(self._build_archive(), tmp_path)
assert not (tmp_path / "link_exact_prefix").exists()
def test_hardlink_outside_prefix_is_skipped(self, tmp_path: Path) -> None:
"""Hard link whose linkname does not start with wrapper/ must be dropped."""
_tar_extract_all(self._build_archive(), tmp_path)
assert not (tmp_path / "link_outside").exists()
def test_regular_file_and_no_spurious_files(self, tmp_path: Path) -> None:
"""Sanity check: target.txt is extracted and no unexpected files appear."""
_tar_extract_all(self._build_archive(), tmp_path)
assert (tmp_path / "target.txt").read_bytes() == b"hello"
extracted = {p.name for p in tmp_path.iterdir()}
assert extracted == {"target.txt", "link_good"}
_IDF_VERSION = "5.1.2"
@pytest.fixture
def espidf_mocks(setup_core: Path):
"""Patch the heavy I/O of check_esp_idf_install and pre-create the framework dir."""
# archive_extract_all is mocked, so pre-create the framework dir that the
# extracted-marker touch writes into.
_get_framework_path(_IDF_VERSION).mkdir(parents=True, exist_ok=True)
with (
patch("esphome.espidf.framework.rmdir"),
patch(
"esphome.espidf.framework.download_from_mirrors",
return_value="https://example.com/idf.tar.xz",
) as download,
patch("esphome.espidf.framework.archive_extract_all") as extract,
patch("esphome.espidf.framework.create_venv") as venv,
patch("esphome.espidf.framework.run_command_ok", return_value=True) as run_ok,
patch("esphome.espidf.framework._clone_idf_with_submodules") as clone,
patch("esphome.espidf.framework._write_idf_version_txt"),
patch("esphome.espidf.framework._patch_tools_json_for_linux_arm64"),
patch("esphome.espidf.framework._write_stamp"),
patch("esphome.espidf.framework._check_stamp", return_value=True),
patch("esphome.espidf.framework._get_idf_version", return_value=_IDF_VERSION),
patch("esphome.espidf.framework._get_python_version", return_value="3.11.0"),
patch("esphome.espidf.framework.get_system_python_path", return_value="python"),
):
yield SimpleNamespace(
download=download, extract=extract, venv=venv, run_ok=run_ok, clone=clone
)
def test_check_esp_idf_install_fresh(espidf_mocks: SimpleNamespace) -> None:
"""A forced install drives download/extract, venv creation, and pip installs."""
framework_path, python_env_path = check_esp_idf_install(_IDF_VERSION, force=True)
assert framework_path == _get_framework_path(_IDF_VERSION)
assert python_env_path == _get_python_env_path(_IDF_VERSION)
# framework tarball + python-env constraints file are both downloaded
assert espidf_mocks.download.call_count == 2
espidf_mocks.extract.assert_called_once()
espidf_mocks.venv.assert_called_once()
espidf_mocks.clone.assert_not_called()
def test_check_esp_idf_install_git_source(espidf_mocks: SimpleNamespace) -> None:
"""A git source_url clones instead of downloading; explicit tools skip discovery."""
check_esp_idf_install(
_IDF_VERSION,
force=True,
source_url="https://github.com/espressif/esp-idf.git",
tools=["xtensa-esp-elf"],
)
espidf_mocks.clone.assert_called_once()
# framework is cloned, so only the python-env constraints file is downloaded
assert espidf_mocks.download.call_count == 1
def test_check_esp_idf_install_already_installed(espidf_mocks: SimpleNamespace) -> None:
"""Marker + matching stamps + existing python env → nothing is re-installed."""
framework_path = _get_framework_path(_IDF_VERSION)
(framework_path / ".esphome_extracted").touch()
python_env_path = _get_python_env_path(_IDF_VERSION)
env_python = get_python_env_executable_path(python_env_path, "python")
env_python.parent.mkdir(parents=True, exist_ok=True)
env_python.touch()
check_esp_idf_install(_IDF_VERSION)
espidf_mocks.extract.assert_not_called()
espidf_mocks.venv.assert_not_called()
def test_check_esp_idf_install_framework_failure(espidf_mocks: SimpleNamespace) -> None:
"""A failing idf_tools install raises."""
espidf_mocks.run_ok.side_effect = [False]
with pytest.raises(RuntimeError, match="framework installation failure"):
check_esp_idf_install(_IDF_VERSION, force=True)
def test_check_esp_idf_install_pip_upgrade_failure(
espidf_mocks: SimpleNamespace,
) -> None:
"""A failing pip upgrade in the python env raises (framework install ok)."""
espidf_mocks.run_ok.side_effect = [True, False]
with pytest.raises(RuntimeError, match="Python environment packages failure"):
check_esp_idf_install(_IDF_VERSION, force=True)
def test_check_esp_idf_install_feature_failure(espidf_mocks: SimpleNamespace) -> None:
"""A failing feature requirements install raises."""
espidf_mocks.run_ok.side_effect = [True, True, False]
with pytest.raises(RuntimeError, match="Python dependencies for"):
check_esp_idf_install(_IDF_VERSION, force=True, features=["fb"])
def _mark_installed() -> None:
"""Create the extracted marker and python-env interpreter so the install
check takes the already-installed path rather than force-installing."""
(_get_framework_path(_IDF_VERSION) / ".esphome_extracted").touch()
env_python = get_python_env_executable_path(
_get_python_env_path(_IDF_VERSION), "python"
)
env_python.parent.mkdir(parents=True, exist_ok=True)
env_python.touch()
def test_check_esp_idf_install_stamp_mismatch_reinstalls(
espidf_mocks: SimpleNamespace,
) -> None:
"""A stamp mismatch reinstalls tools (marker present, so no re-extract)."""
_mark_installed()
with patch("esphome.espidf.framework._check_stamp", return_value=False):
check_esp_idf_install(_IDF_VERSION)
espidf_mocks.extract.assert_not_called() # marker present -> no re-extract
espidf_mocks.venv.assert_called_once() # tools reinstall -> venv rebuilt
def test_check_esp_idf_install_check_command_failure_reinstalls(
espidf_mocks: SimpleNamespace,
) -> None:
"""A failing idf_tools check reinstalls tools (marker present, no re-extract)."""
_mark_installed()
# idf_tools check fails -> install stays True; the later installs succeed.
espidf_mocks.run_ok.side_effect = [False, True, True, True]
check_esp_idf_install(_IDF_VERSION, features=["fb"])
espidf_mocks.extract.assert_not_called()
espidf_mocks.venv.assert_called_once()
def test_check_esp_idf_install_unknown_python_version_reinstalls(
espidf_mocks: SimpleNamespace,
) -> None:
"""An undeterminable python version rebuilds the venv (framework stamp still ok)."""
_mark_installed()
with patch("esphome.espidf.framework._get_python_version", return_value=None):
check_esp_idf_install(_IDF_VERSION)
espidf_mocks.extract.assert_not_called() # framework stamp matched
espidf_mocks.venv.assert_called_once() # python env rebuilt
def test_check_esp_idf_install_python_stamp_mismatch_rebuilds_venv(
espidf_mocks: SimpleNamespace,
) -> None:
"""Framework stamp matches but the python-env stamp does not -> venv rebuilt."""
# _check_stamp passes for the framework (no python_version key) and fails
# for the python env (carries python_version), so only the venv rebuilds.
def stamp_ok(_stamp_file, info: dict) -> bool:
return "python_version" not in info
_mark_installed()
with patch("esphome.espidf.framework._check_stamp", side_effect=stamp_ok):
check_esp_idf_install(_IDF_VERSION)
espidf_mocks.extract.assert_not_called()
espidf_mocks.venv.assert_called_once()
def test_check_esp_idf_install_unparseable_version(
espidf_mocks: SimpleNamespace,
) -> None:
"""A non-semver version skips the MAJOR/MINOR substitutions without erroring."""
bad_version = "main"
_get_framework_path(bad_version).mkdir(parents=True, exist_ok=True)
check_esp_idf_install(bad_version, force=True)
espidf_mocks.extract.assert_called_once()
# ---------------------------------------------------------------------------
# _patch_tools_json_for_linux_arm64 (arm64-only ninja backport)
# ---------------------------------------------------------------------------
def _write_tools_json(framework_path: Path, data: dict) -> Path:
tools_dir = framework_path / "tools"
tools_dir.mkdir(parents=True, exist_ok=True)
tools_json = tools_dir / "tools.json"
tools_json.write_text(json.dumps(data), encoding="utf-8")
return tools_json
def test_patch_tools_json_non_aarch64_is_noop(tmp_path: Path) -> None:
tools_json = _write_tools_json(
tmp_path, {"tools": [{"name": "ninja", "versions": [{"name": "1.12.1"}]}]}
)
before = tools_json.read_text(encoding="utf-8")
with patch("esphome.espidf.framework.platform.machine", return_value="x86_64"):
_patch_tools_json_for_linux_arm64(tmp_path)
assert tools_json.read_text(encoding="utf-8") == before
def test_patch_tools_json_missing_file_is_noop(tmp_path: Path) -> None:
with patch("esphome.espidf.framework.platform.machine", return_value="aarch64"):
_patch_tools_json_for_linux_arm64(tmp_path) # no tools/tools.json present
def test_patch_tools_json_corrupt_file_warns_and_skips(tmp_path: Path) -> None:
(tmp_path / "tools").mkdir()
(tmp_path / "tools" / "tools.json").write_text("{ not json", encoding="utf-8")
with patch("esphome.espidf.framework.platform.machine", return_value="aarch64"):
_patch_tools_json_for_linux_arm64(tmp_path) # JSONDecodeError -> skip
def test_patch_tools_json_injects_ninja_arm64(tmp_path: Path) -> None:
tools_json = _write_tools_json(
tmp_path,
{
"tools": [
{"name": "ninja", "versions": [{"name": "1.12.1"}]},
{"name": "cmake", "versions": [{"name": "3.24.0"}]},
]
},
)
with patch("esphome.espidf.framework.platform.machine", return_value="aarch64"):
_patch_tools_json_for_linux_arm64(tmp_path)
data = json.loads(tools_json.read_text(encoding="utf-8"))
ninja = next(t for t in data["tools"] if t["name"] == "ninja")
assert "linux-arm64" in ninja["versions"][0]
assert ninja["versions"][0]["linux-arm64"]["size"] == 121787
def test_patch_tools_json_already_patched_is_noop(tmp_path: Path) -> None:
tools_json = _write_tools_json(
tmp_path,
{
"tools": [
{
"name": "ninja",
"versions": [{"name": "1.12.1", "linux-arm64": {"url": "x"}}],
}
]
},
)
before = tools_json.read_text(encoding="utf-8")
with patch("esphome.espidf.framework.platform.machine", return_value="aarch64"):
_patch_tools_json_for_linux_arm64(tmp_path)
assert tools_json.read_text(encoding="utf-8") == before
# ---------------------------------------------------------------------------
# Subprocess-backed helpers (_exec -> run_command rename) and get_framework_env
# ---------------------------------------------------------------------------
def test_get_idf_version_parses_stdout(tmp_path: Path) -> None:
with patch(
"esphome.espidf.framework.run_command", return_value=(True, "5.1.2\n", "")
):
assert _get_idf_version(tmp_path) == "5.1.2"
def test_get_idf_version_raises_on_failure(tmp_path: Path) -> None:
with (
patch("esphome.espidf.framework.run_command", return_value=(False, "", "boom")),
pytest.raises(RuntimeError, match="Can't get ESP-IDF version"),
):
_get_idf_version(tmp_path)
def test_get_idf_tool_paths_parses_json(tmp_path: Path) -> None:
payload = json.dumps({"paths_to_export": ["/a", "/b"], "export_vars": {"X": "1"}})
with patch(
"esphome.espidf.framework.run_command", return_value=(True, payload, "")
):
paths, export_vars = _get_idf_tool_paths(tmp_path)
assert paths == ["/a", "/b"]
assert export_vars == {"X": "1"}
def test_get_idf_tool_paths_raises_on_bad_json(tmp_path: Path) -> None:
with (
patch(
"esphome.espidf.framework.run_command", return_value=(True, "not json", "")
),
pytest.raises(RuntimeError, match="Can't extract ESP-IDF tool paths"),
):
_get_idf_tool_paths(tmp_path)
def test_get_idf_tool_paths_raises_on_failure(tmp_path: Path) -> None:
with (
patch("esphome.espidf.framework.run_command", return_value=(False, "", "err")),
pytest.raises(RuntimeError, match="Can't get ESP-IDF tool paths"),
):
_get_idf_tool_paths(tmp_path)
def test_get_python_version_parses_stdout(tmp_path: Path) -> None:
with patch(
"esphome.espidf.framework.run_command", return_value=(True, "3.11.0\n", "")
):
assert _get_python_version(tmp_path / "python") == "3.11.0"
def test_get_python_version_returns_falsy_on_failure(tmp_path: Path) -> None:
with patch("esphome.espidf.framework.run_command", return_value=(False, "", "")):
# non-throwing failure returns the (empty) stdout as-is
assert not _get_python_version(tmp_path / "python")
def test_get_python_version_raises_when_requested(tmp_path: Path) -> None:
with (
patch("esphome.espidf.framework.run_command", return_value=(False, "", "")),
pytest.raises(RuntimeError, match="Can't get Python version"),
):
_get_python_version(tmp_path / "python", throw_exception=True)
def test_write_stamp_writes_json(tmp_path: Path) -> None:
stamp = tmp_path / "stamp.json"
_write_stamp(stamp, {"a": "1", "b": "2"})
assert json.loads(stamp.read_text(encoding="utf-8")) == {"a": "1", "b": "2"}
def test_get_framework_env_with_python_env(tmp_path: Path) -> None:
with (
patch(
"esphome.espidf.framework._get_idf_tools_path",
return_value=tmp_path / "tools",
),
patch("esphome.espidf.framework._get_idf_version", return_value="5.1.2"),
patch(
"esphome.espidf.framework._get_idf_tool_paths",
return_value=(["/tool/bin"], {"IDF_X": "1"}),
),
):
env = get_framework_env(
tmp_path / "fw", tmp_path / "penv", {"PATH": "/usr/bin"}
)
assert env["IDF_PATH"] == str(tmp_path / "fw")
assert env["ESP_IDF_VERSION"] == "5.1.2"
assert env["IDF_X"] == "1"
assert env["IDF_PYTHON_ENV_PATH"] == str(tmp_path / "penv")
assert "/tool/bin" in env["PATH"]
def test_get_framework_env_without_python_env_uses_os_path(tmp_path: Path) -> None:
with (
patch(
"esphome.espidf.framework._get_idf_tools_path",
return_value=tmp_path / "tools",
),
patch("esphome.espidf.framework._get_idf_version", return_value="5.1.2"),
patch("esphome.espidf.framework._get_idf_tool_paths", return_value=([], {})),
):
env = get_framework_env(tmp_path / "fw")
assert "IDF_PYTHON_ENV_PATH" not in env
assert env["PATH"] # taken from os.environ
# ---------------------------------------------------------------------------
# _check_stamp / _write_idf_version_txt / _get_idf_tools_path
# ---------------------------------------------------------------------------
def test_check_stamp_matches(tmp_path: Path) -> None:
f = tmp_path / "s.json"
f.write_text(json.dumps({"a": "1"}), encoding="utf-8")
assert _check_stamp(f, {"a": "1"}) is True
def test_check_stamp_mismatch(tmp_path: Path) -> None:
f = tmp_path / "s.json"
f.write_text(json.dumps({"a": "1"}), encoding="utf-8")
assert _check_stamp(f, {"a": "2"}) is False
def test_check_stamp_missing_file(tmp_path: Path) -> None:
assert _check_stamp(tmp_path / "nope.json", {"a": "1"}) is False
def test_check_stamp_corrupt_file(tmp_path: Path) -> None:
f = tmp_path / "s.json"
f.write_text("{ not json", encoding="utf-8")
assert _check_stamp(f, {"a": "1"}) is False
def test_write_idf_version_txt_writes_when_missing(tmp_path: Path) -> None:
_write_idf_version_txt(tmp_path, "5.1.2")
assert (tmp_path / "version.txt").read_text(encoding="utf-8") == "v5.1.2\n"
def test_write_idf_version_txt_skips_when_present(tmp_path: Path) -> None:
(tmp_path / "version.txt").write_text("existing\n", encoding="utf-8")
_write_idf_version_txt(tmp_path, "5.1.2")
assert (tmp_path / "version.txt").read_text(encoding="utf-8") == "existing\n"
def test_get_idf_tools_path_env_override(tmp_path: Path) -> None:
override = str(tmp_path / "custom-idf")
with patch.dict("os.environ", {"ESPHOME_ESP_IDF_PREFIX": override}):
assert _get_idf_tools_path() == Path(override)
def test_write_idf_version_txt_warns_on_write_error(tmp_path: Path) -> None:
with patch("pathlib.Path.write_text", side_effect=OSError("denied")):
# write failure is caught and warned, not raised
_write_idf_version_txt(tmp_path, "5.1.2")

View File

@@ -0,0 +1,954 @@
"""Tests for esphome.framework_helpers."""
# pylint: disable=protected-access
import importlib.util
import io
import logging
import os
from pathlib import Path
import subprocess
import sys
import tarfile
from unittest.mock import MagicMock, Mock, patch
import zipfile
import pytest
import requests as req
from esphome.framework_helpers import (
_7z_extract_all,
_detect_archive_root,
_rename_with_retry,
_tar_extract_all,
_zip_extract_all,
archive_extract_all,
create_venv,
download_from_mirrors,
get_python_env_executable_path,
get_system_python_path,
rmdir,
run_command,
run_command_ok,
str_to_lst_of_str,
)
_HAS_PY7ZR = importlib.util.find_spec("py7zr") is not None
# ---------------------------------------------------------------------------
# str_to_lst_of_str
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
("value", "expected"),
[
("a;b;c", ["a", "b", "c"]),
(" a ; b ", ["a", "b"]),
(";; a ;;", ["a"]),
("single", ["single"]),
("", []),
(["already", "a", "list"], ["already", "a", "list"]),
],
)
def test_str_to_lst_of_str(value: str | list, expected: list) -> None:
assert str_to_lst_of_str(value) == expected
# ---------------------------------------------------------------------------
# rmdir
# ---------------------------------------------------------------------------
def test_rmdir_nonexistent_is_noop(tmp_path: Path) -> None:
rmdir(tmp_path / "missing")
def test_rmdir_removes_existing_directory(tmp_path: Path) -> None:
d = tmp_path / "to_remove"
d.mkdir()
(d / "file.txt").write_text("x")
rmdir(d)
assert not d.exists()
def test_rmdir_logs_debug_with_msg(
tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
d = tmp_path / "logged"
d.mkdir()
with caplog.at_level(logging.DEBUG, logger="esphome.framework_helpers"):
rmdir(d, msg="cleanup message")
assert "cleanup message" in caplog.text
def test_rmdir_raises_runtime_error_on_os_error(tmp_path: Path) -> None:
d = tmp_path / "stubborn"
d.mkdir()
with (
patch("esphome.framework_helpers.rmtree", side_effect=OSError("perm denied")),
pytest.raises(RuntimeError, match="can't remove"),
):
rmdir(d, msg="cleanup step")
# ---------------------------------------------------------------------------
# get_system_python_path
# ---------------------------------------------------------------------------
def test_get_system_python_path_returns_env_var() -> None:
with patch.dict(os.environ, {"PYTHONEXEPATH": "/custom/python"}):
assert get_system_python_path() == "/custom/python"
def test_get_system_python_path_falls_back_to_sys_executable() -> None:
env = {k: v for k, v in os.environ.items() if k != "PYTHONEXEPATH"}
with patch.dict(os.environ, env, clear=True):
assert get_system_python_path() == os.path.normpath(sys.executable)
# ---------------------------------------------------------------------------
# get_python_env_executable_path
# ---------------------------------------------------------------------------
@pytest.mark.skipif(os.name != "posix", reason="PosixPath construction requires POSIX")
def test_get_python_env_executable_path_posix() -> None:
assert get_python_env_executable_path("/env", "python") == Path("/env/bin/python")
@pytest.mark.skipif(os.name != "nt", reason="WindowsPath construction requires Windows")
def test_get_python_env_executable_path_windows() -> None:
assert get_python_env_executable_path("/env", "python") == Path(
"/env/Scripts/python.exe"
)
# ---------------------------------------------------------------------------
# run_command
# ---------------------------------------------------------------------------
def test_run_command_success_returns_stdout(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=0, stdout="out\n", stderr="")
ok, stdout, _stderr = run_command(["echo", "hello"])
assert ok is True
assert stdout == "out\n"
def test_run_command_failure_returns_false(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=1, stdout="", stderr="boom")
ok, _stdout, stderr = run_command(["bad"])
assert ok is False
assert stderr == "boom"
def test_run_command_stream_output_success(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=0)
ok, stdout, stderr = run_command(["cmd"], stream_output=True)
assert ok is True
assert stdout is None
assert stderr is None
def test_run_command_stream_output_failure(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=2)
ok, stdout, _stderr = run_command(["cmd"], stream_output=True)
assert ok is False
assert stdout is None
def test_run_command_subprocess_error_returns_false(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.side_effect = subprocess.SubprocessError("exploded")
ok, stdout, stderr = run_command(["cmd"])
assert ok is False
assert stdout is None
assert stderr is None
def test_run_command_os_error_returns_false(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.side_effect = OSError("not found")
ok, _stdout, _stderr = run_command(["cmd"])
assert ok is False
def test_run_command_passes_env(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=0, stdout="", stderr="")
run_command(["cmd"], env={"MY_VAR": "42"})
assert mock_subprocess_run.call_args[1]["env"]["MY_VAR"] == "42"
def test_run_command_passes_cwd(mock_subprocess_run: Mock, tmp_path: Path) -> None:
mock_subprocess_run.return_value = Mock(returncode=0, stdout="", stderr="")
run_command(["cmd"], cwd=str(tmp_path))
assert mock_subprocess_run.call_args[1]["cwd"] == str(tmp_path)
# ---------------------------------------------------------------------------
# run_command_ok
# ---------------------------------------------------------------------------
def test_run_command_ok_true(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=0, stdout="", stderr="")
assert run_command_ok(["cmd"]) is True
def test_run_command_ok_false(mock_subprocess_run: Mock) -> None:
mock_subprocess_run.return_value = Mock(returncode=1, stdout="", stderr="")
assert run_command_ok(["cmd"]) is False
# ---------------------------------------------------------------------------
# create_venv
# ---------------------------------------------------------------------------
def test_create_venv_calls_run_command_ok(tmp_path: Path) -> None:
with patch(
"esphome.framework_helpers.run_command_ok", return_value=True
) as mock_cmd:
create_venv(tmp_path / "env", msg="test")
mock_cmd.assert_called_once()
def test_create_venv_raises_on_failure(tmp_path: Path) -> None:
with (
patch("esphome.framework_helpers.run_command_ok", return_value=False),
pytest.raises(RuntimeError, match="Can't create Python virtual environment"),
):
create_venv(tmp_path / "env", msg="test")
# ---------------------------------------------------------------------------
# _detect_archive_root
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
("names", "expected"),
[
(["wrapper/", "wrapper/a.txt", "wrapper/sub/b.txt"], "wrapper"),
(["root1/a.txt", "root2/b.txt"], None),
(["wrapper"], None), # no descendant → None
(["", "wrapper/file.txt"], "wrapper"), # empty names skipped
(["wrapper\\file.txt"], "wrapper"), # backslash normalised
(["w/a", "w/b", "w/c"], "w"),
],
)
def test_detect_archive_root(names: list[str], expected: str | None) -> None:
assert _detect_archive_root(names) == expected
# ---------------------------------------------------------------------------
# Tar archive helpers
# ---------------------------------------------------------------------------
def _make_tar(
members: list[tarfile.TarInfo],
file_contents: dict[str, bytes] | None = None,
) -> io.BytesIO:
buf = io.BytesIO()
contents = file_contents or {}
with tarfile.open(fileobj=buf, mode="w") as tf:
for info in members:
if info.isreg() and info.name in contents:
data = contents[info.name]
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
else:
tf.addfile(info)
buf.seek(0)
return buf
def _reg(name: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.REGTYPE
info.size = 0
info.mode = 0o644
return info
def _dir(name: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.DIRTYPE
info.mode = 0o755
return info
def _sym(name: str, target: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.SYMTYPE
info.linkname = target
info.mode = 0o777
return info
def _special(name: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.CHRTYPE
info.mode = 0o600
return info
def _hlnk(name: str, target: str) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.type = tarfile.LNKTYPE
info.linkname = target
info.mode = 0o644
return info
# ---------------------------------------------------------------------------
# _tar_extract_all — branches not covered by the hard-link prefix-strip tests
# ---------------------------------------------------------------------------
class TestTarExtractAllSecurity:
def test_flat_archive_no_wrapper(self, tmp_path: Path) -> None:
"""Without a single common root files land directly in extract_dir."""
buf = _make_tar(
[_reg("a.txt"), _reg("b.txt")],
{"a.txt": b"aaa", "b.txt": b"bbb"},
)
_tar_extract_all(buf, tmp_path)
assert (tmp_path / "a.txt").read_bytes() == b"aaa"
assert (tmp_path / "b.txt").read_bytes() == b"bbb"
def test_directory_member_extracted(self, tmp_path: Path) -> None:
buf = _make_tar([_dir("subdir/")])
_tar_extract_all(buf, tmp_path)
assert (tmp_path / "subdir").is_dir()
def test_symlink_within_dest_extracted(self, tmp_path: Path) -> None:
buf = _make_tar(
[_reg("target.txt"), _sym("link.txt", "target.txt")],
{"target.txt": b"data"},
)
_tar_extract_all(buf, tmp_path)
assert (tmp_path / "link.txt").exists()
def test_path_traversal_skipped(self, tmp_path: Path) -> None:
"""Member resolving outside extract_dir via .. is silently skipped."""
info = tarfile.TarInfo(name="sub/../../escape.txt")
info.type = tarfile.REGTYPE
info.size = 5
info.mode = 0o644
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
tf.addfile(info, io.BytesIO(b"OOPS!"))
buf.seek(0)
_tar_extract_all(buf, tmp_path)
assert not (tmp_path.parent / "escape.txt").exists()
assert not list(tmp_path.rglob("escape.txt"))
def test_absolute_symlink_target_skipped(self, tmp_path: Path) -> None:
"""Symlink pointing to an absolute path is silently skipped."""
buf = _make_tar(
[_reg("real.txt"), _sym("danger.lnk", "/etc/passwd")],
{"real.txt": b"ok"},
)
_tar_extract_all(buf, tmp_path)
assert not (tmp_path / "danger.lnk").exists()
def test_symlink_escaping_dest_skipped(self, tmp_path: Path) -> None:
"""Symlink whose resolved path exits extract_dir is silently skipped."""
buf = _make_tar([_sym("up.lnk", "../outside.txt")])
_tar_extract_all(buf, tmp_path)
assert not (tmp_path / "up.lnk").exists()
def test_special_file_skipped(self, tmp_path: Path) -> None:
"""Character-device and other special-file members are silently skipped."""
buf = _make_tar([_special("chardev")])
_tar_extract_all(buf, tmp_path)
assert not (tmp_path / "chardev").exists()
@pytest.mark.skipif(
os.name == "nt", reason="Windows has no POSIX executable permission bit"
)
def test_executable_bit_preserved(self, tmp_path: Path) -> None:
"""User-executable bit is kept for explicitly executable files."""
info = _reg("script.sh")
info.mode = 0o755
buf = _make_tar([info], {"script.sh": b"#!/bin/sh"})
_tar_extract_all(buf, tmp_path)
assert (tmp_path / "script.sh").stat().st_mode & 0o100 # S_IXUSR
def test_non_executable_exec_bits_stripped(self, tmp_path: Path) -> None:
"""Exec bits are removed when S_IXUSR is not set."""
info = _reg("data.bin")
info.mode = 0o654 # group/other exec present, user exec absent
buf = _make_tar([info], {"data.bin": b"\x00"})
_tar_extract_all(buf, tmp_path)
mode = (tmp_path / "data.bin").stat().st_mode
assert not (mode & 0o111) # all exec bits cleared
# ---------------------------------------------------------------------------
# ZIP archive helper
# ---------------------------------------------------------------------------
def _make_zip(entries: list[tuple[str, str | bytes]]) -> io.BytesIO:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
for name, content in entries:
zf.writestr(name, content)
buf.seek(0)
return buf
# ---------------------------------------------------------------------------
# _zip_extract_all
# ---------------------------------------------------------------------------
class TestZipExtractAll:
def test_basic_extraction_strips_wrapper(self, tmp_path: Path) -> None:
buf = _make_zip([("wrapper/file.txt", "hello")])
_zip_extract_all(buf, tmp_path)
assert (tmp_path / "file.txt").read_text() == "hello"
def test_flat_archive_no_wrapper(self, tmp_path: Path) -> None:
buf = _make_zip([("a.txt", "aaa"), ("b.txt", "bbb")])
_zip_extract_all(buf, tmp_path)
assert (tmp_path / "a.txt").read_text() == "aaa"
assert (tmp_path / "b.txt").read_text() == "bbb"
def test_wrapper_root_entry_skipped(self, tmp_path: Path) -> None:
"""The wrapper directory entry itself (step 3a) does not appear in dest."""
buf = _make_zip([("wrapper/", ""), ("wrapper/file.txt", "content")])
_zip_extract_all(buf, tmp_path)
assert (tmp_path / "file.txt").read_text() == "content"
assert not (tmp_path / "wrapper").exists()
def test_path_traversal_raises(self, tmp_path: Path) -> None:
# Two members with different roots so _detect_archive_root returns None
# and strip_prefix is not applied, leaving "../escape.txt" to hit the
# commonpath safety check directly.
buf = _make_zip([("safe.txt", "ok"), ("../escape.txt", "bad")])
with pytest.raises(ValueError, match="Unsafe path"):
_zip_extract_all(buf, tmp_path)
def test_multiple_files_extracted(self, tmp_path: Path) -> None:
entries = [(f"root/{c}.txt", c * 3) for c in "abc"]
buf = _make_zip(entries)
_zip_extract_all(buf, tmp_path)
for c in "abc":
assert (tmp_path / f"{c}.txt").read_text() == c * 3
# ---------------------------------------------------------------------------
# archive_extract_all dispatch
# ---------------------------------------------------------------------------
def _gzip_tar_bytes(entries: dict[str, bytes]) -> bytes:
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
for name, content in entries.items():
info = tarfile.TarInfo(name=name)
info.size = len(content)
info.mode = 0o644
tf.addfile(info, io.BytesIO(content))
return buf.getvalue()
class TestArchiveExtractAll:
def test_path_input_gzip_tar(self, tmp_path: Path) -> None:
archive = tmp_path / "test.tar.gz"
archive.write_bytes(_gzip_tar_bytes({"file.txt": b"hello"}))
dest = tmp_path / "out"
dest.mkdir()
archive_extract_all(archive, dest)
assert (dest / "file.txt").read_bytes() == b"hello"
def test_buffered_reader_input(self, tmp_path: Path) -> None:
archive = tmp_path / "test.tar.gz"
archive.write_bytes(_gzip_tar_bytes({"file.txt": b"data"}))
dest = tmp_path / "out"
dest.mkdir()
with archive.open("rb") as f: # io.BufferedReader
archive_extract_all(f, dest)
assert (dest / "file.txt").read_bytes() == b"data"
def test_rawio_input(self, tmp_path: Path) -> None:
archive = tmp_path / "test.tar.gz"
archive.write_bytes(_gzip_tar_bytes({"file.txt": b"raw"}))
dest = tmp_path / "out"
dest.mkdir()
archive_extract_all(io.FileIO(archive), dest)
assert (dest / "file.txt").read_bytes() == b"raw"
def test_zip_dispatched(self, tmp_path: Path) -> None:
archive = tmp_path / "test.zip"
archive.write_bytes(_make_zip([("file.txt", "hi")]).getvalue())
dest = tmp_path / "out"
dest.mkdir()
archive_extract_all(archive, dest)
assert (dest / "file.txt").read_text() == "hi"
def test_invalid_type_raises_type_error(self) -> None:
with pytest.raises(TypeError, match="archive must be"):
archive_extract_all(42, ".") # type: ignore[arg-type]
def test_unsupported_format_raises_value_error(self, tmp_path: Path) -> None:
bad = tmp_path / "bad.bin"
bad.write_bytes(b"\x00\x01\x02\x03\x04\x05\x06")
with pytest.raises(ValueError, match="Unsupported archive format"):
archive_extract_all(bad, tmp_path)
# ---------------------------------------------------------------------------
# download_from_mirrors
# ---------------------------------------------------------------------------
def _mock_response(content: bytes, ok: bool = True) -> MagicMock:
r = MagicMock()
r.__enter__.return_value = r
r.__exit__.return_value = False
if ok:
r.raise_for_status.return_value = None
else:
r.raise_for_status.side_effect = req.HTTPError("503")
r.headers = {"content-length": "0"} # suppress ProgressBar
r.iter_content.return_value = [content] if content else []
return r
class TestDownloadFromMirrors:
def test_success_returns_url_and_writes_content(self, tmp_path: Path) -> None:
target = tmp_path / "out.bin"
with patch(
"esphome.framework_helpers.requests.get",
return_value=_mock_response(b"filedata"),
):
url = download_from_mirrors(["https://example.com/f"], {}, target)
assert url == "https://example.com/f"
assert target.read_bytes() == b"filedata"
def test_substitutions_applied_to_url(self, tmp_path: Path) -> None:
with patch(
"esphome.framework_helpers.requests.get",
return_value=_mock_response(b"x"),
) as mock_get:
download_from_mirrors(
["https://example.com/{VERSION}.bin"],
{"VERSION": "1.2.3"},
tmp_path / "out.bin",
)
assert mock_get.call_args[0][0] == "https://example.com/1.2.3.bin"
def test_falls_back_to_second_mirror(self, tmp_path: Path) -> None:
with patch(
"esphome.framework_helpers.requests.get",
side_effect=[_mock_response(b"", ok=False), _mock_response(b"second")],
):
url = download_from_mirrors(
["https://mirror1.com/f", "https://mirror2.com/f"],
{},
tmp_path / "out.bin",
)
assert url == "https://mirror2.com/f"
assert (tmp_path / "out.bin").read_bytes() == b"second"
def test_all_mirrors_fail_reraises_last_exception(self, tmp_path: Path) -> None:
with (
patch(
"esphome.framework_helpers.requests.get",
return_value=_mock_response(b"", ok=False),
),
pytest.raises(req.HTTPError),
):
download_from_mirrors(["https://example.com/f"], {}, tmp_path / "out.bin")
def test_empty_mirrors_raises_value_error(self, tmp_path: Path) -> None:
with pytest.raises(ValueError, match="empty mirrors list"):
download_from_mirrors([], {}, tmp_path / "out.bin")
def test_invalid_target_type_raises_type_error(self) -> None:
with pytest.raises(TypeError, match="target must be"):
download_from_mirrors(["https://example.com/f"], {}, 42) # type: ignore[arg-type]
def test_file_like_target_written(self) -> None:
buf = io.BytesIO()
with patch(
"esphome.framework_helpers.requests.get",
return_value=_mock_response(b"bytes"),
):
download_from_mirrors(["https://example.com/f"], {}, buf)
buf.seek(0)
assert buf.read() == b"bytes"
def test_progress_bar_shown_when_content_length_known(self, tmp_path: Path) -> None:
r = _mock_response(b"1234567890")
r.headers = {"content-length": "10"}
with (
patch("esphome.framework_helpers.requests.get", return_value=r),
patch("esphome.framework_helpers.ProgressBar") as mock_pb,
):
download_from_mirrors(["https://example.com/f"], {}, tmp_path / "out.bin")
mock_pb.assert_called_once_with("Downloading")
mock_pb.return_value.update.assert_called()
def test_empty_chunk_not_written(self, tmp_path: Path) -> None:
"""Empty chunks yielded by iter_content are skipped without writing."""
r = MagicMock()
r.__enter__.return_value = r
r.__exit__.return_value = False
r.raise_for_status.return_value = None
r.headers = {"content-length": "0"}
r.iter_content.return_value = [b""] # one empty chunk
target = tmp_path / "out.bin"
with patch("esphome.framework_helpers.requests.get", return_value=r):
download_from_mirrors(["https://example.com/f"], {}, target)
assert target.exists()
assert target.read_bytes() == b""
# ---------------------------------------------------------------------------
# get_python_env_executable_path — Windows branch
# ---------------------------------------------------------------------------
def test_get_python_env_executable_path_nt() -> None:
"""Windows path uses Scripts/ and .exe suffix."""
from pathlib import PurePosixPath
with (
patch.object(os, "name", "nt"),
patch("esphome.framework_helpers.Path", PurePosixPath),
):
result = get_python_env_executable_path("/env", "python")
assert str(result) == "/env/Scripts/python.exe"
# ---------------------------------------------------------------------------
# _tar_extract_all — additional branch coverage
# ---------------------------------------------------------------------------
class TestTarExtractAllBranches:
@pytest.mark.skipif(
sys.version_info < (3, 12),
reason="patching os.name makes pathlib build a WindowsPath, which only "
"instantiates on POSIX in 3.12+",
)
def test_windows_drive_path_skipped(self, tmp_path: Path) -> None:
"""Windows-style drive path (C:/...) is skipped when os.name == 'nt'."""
info = tarfile.TarInfo(name="C:/secret.txt")
info.type = tarfile.REGTYPE
info.size = 0
info.mode = 0o644
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
tf.addfile(info)
buf.seek(0)
with patch.object(os, "name", "nt"):
_tar_extract_all(buf, tmp_path)
assert not list(tmp_path.rglob("*"))
def test_strip_root_exact_match_skipped(self, tmp_path: Path) -> None:
"""Member whose name equals strip_root exactly (no trailing slash) is skipped."""
# "wrapper" (file entry) + "wrapper/file.txt" causes _detect_archive_root
# to return "wrapper"; the bare "wrapper" entry matches strip_root exactly.
buf = _make_tar(
[_reg("wrapper"), _reg("wrapper/file.txt")],
{"wrapper/file.txt": b"content"},
)
_tar_extract_all(buf, tmp_path)
assert not (tmp_path / "wrapper").exists()
assert (tmp_path / "file.txt").read_bytes() == b"content"
def test_member_not_under_strip_prefix_skipped(self, tmp_path: Path) -> None:
"""Member whose name doesn't start with strip_prefix is silently skipped."""
buf = _make_tar([_reg("other/file.txt")], {"other/file.txt": b"data"})
with patch("esphome.framework_helpers._detect_archive_root", return_value="w"):
_tar_extract_all(buf, tmp_path)
assert not list(tmp_path.rglob("*"))
def test_hardlink_prefix_stripped(self, tmp_path: Path) -> None:
"""Hard-link linkname has wrapper prefix stripped along with its entry name."""
buf = _make_tar(
[_reg("wrapper/file.txt"), _hlnk("wrapper/link.txt", "wrapper/file.txt")],
{"wrapper/file.txt": b"data"},
)
_tar_extract_all(buf, tmp_path)
assert (tmp_path / "file.txt").read_bytes() == b"data"
assert (tmp_path / "link.txt").exists()
def test_hardlink_linkname_equals_strip_root_skipped(self, tmp_path: Path) -> None:
"""Hard link whose linkname equals strip_root is silently skipped."""
buf = _make_tar(
[_reg("wrapper/file.txt"), _hlnk("wrapper/link.txt", "wrapper")],
{"wrapper/file.txt": b"data"},
)
_tar_extract_all(buf, tmp_path)
assert not (tmp_path / "link.txt").exists()
def test_hardlink_linkname_outside_prefix_skipped(self, tmp_path: Path) -> None:
"""Hard link whose linkname doesn't start with strip_prefix is skipped."""
buf = _make_tar(
[_reg("wrapper/file.txt"), _hlnk("wrapper/link.txt", "other/file.txt")],
{"wrapper/file.txt": b"data"},
)
_tar_extract_all(buf, tmp_path)
assert not (tmp_path / "link.txt").exists()
def test_member_mode_none_skips_sanitization(self, tmp_path: Path) -> None:
"""Member with mode=None bypasses the sanitization block without error."""
info = _reg("file.txt")
buf = _make_tar([info], {"file.txt": b"data"})
buf.seek(0)
with tarfile.open(fileobj=buf) as tf:
members = tf.getmembers()
for m in members:
m.mode = None
buf.seek(0)
with (
patch("tarfile.TarFile.getmembers", return_value=members),
patch("tarfile.TarFile.extract"),
):
_tar_extract_all(buf, tmp_path)
def test_progress_bar_shown(self, tmp_path: Path) -> None:
"""A non-empty progress_header causes ProgressBar to be created and updated."""
buf = _make_tar([_reg("file.txt")], {"file.txt": b"x"})
with patch("esphome.framework_helpers.ProgressBar") as mock_pb:
_tar_extract_all(buf, tmp_path, progress_header="Extracting")
mock_pb.assert_called_once_with("Extracting")
mock_pb.return_value.update.assert_called()
# ---------------------------------------------------------------------------
# _zip_extract_all — additional branch coverage
# ---------------------------------------------------------------------------
class TestZipExtractAllBranches:
@pytest.mark.skipif(
sys.version_info < (3, 12),
reason="patching os.name makes pathlib build a WindowsPath, which only "
"instantiates on POSIX in 3.12+",
)
def test_windows_drive_path_skipped(self, tmp_path: Path) -> None:
"""Windows-style drive path (C:/...) is skipped when os.name == 'nt'."""
buf = _make_zip([("C:/secret.txt", "bad")])
with patch.object(os, "name", "nt"):
_zip_extract_all(buf, tmp_path)
assert not list(tmp_path.rglob("*"))
def test_member_not_under_strip_prefix_skipped(self, tmp_path: Path) -> None:
"""Member whose name doesn't start with strip_prefix is silently skipped."""
buf = _make_zip([("other/file.txt", "data")])
with patch("esphome.framework_helpers._detect_archive_root", return_value="w"):
_zip_extract_all(buf, tmp_path)
assert not list(tmp_path.rglob("*"))
def test_progress_bar_shown(self, tmp_path: Path) -> None:
"""A non-empty progress_header causes ProgressBar to be created and updated."""
buf = _make_zip([("file.txt", "hello")])
with patch("esphome.framework_helpers.ProgressBar") as mock_pb:
_zip_extract_all(buf, tmp_path, progress_header="Unzipping")
mock_pb.assert_called_once_with("Unzipping")
mock_pb.return_value.update.assert_called()
# ---------------------------------------------------------------------------
# _rename_with_retry
# ---------------------------------------------------------------------------
class TestRenameWithRetry:
def test_success_on_first_attempt(self, tmp_path: Path) -> None:
src = tmp_path / "src.txt"
src.write_text("data")
dst = tmp_path / "dst.txt"
_rename_with_retry(src, dst)
assert dst.read_text() == "data"
assert not src.exists()
def test_retries_on_permission_error_then_succeeds(self, tmp_path: Path) -> None:
src = tmp_path / "src.txt"
src.write_text("data")
dst = tmp_path / "dst.txt"
call_count = 0
original_rename = Path.rename
def flaky_rename(self, target):
nonlocal call_count
call_count += 1
if call_count == 1:
raise PermissionError("locked")
return original_rename(self, target)
with (
patch.object(Path, "rename", flaky_rename),
patch("esphome.framework_helpers.time.sleep"),
):
_rename_with_retry(src, dst, attempts=3)
assert dst.read_text() == "data"
def test_raises_after_all_attempts_fail(self, tmp_path: Path) -> None:
src = tmp_path / "src.txt"
src.write_text("data")
dst = tmp_path / "dst.txt"
with (
patch.object(Path, "rename", side_effect=PermissionError("locked")),
patch("esphome.framework_helpers.time.sleep"),
pytest.raises(PermissionError),
):
_rename_with_retry(src, dst, attempts=3)
def test_attempts_zero_is_noop(self, tmp_path: Path) -> None:
"""Zero attempts means the for-loop body never runs; src is untouched."""
src = tmp_path / "src.txt"
src.write_text("data")
dst = tmp_path / "dst.txt"
_rename_with_retry(src, dst, attempts=0)
assert src.exists()
assert not dst.exists()
# ---------------------------------------------------------------------------
# _7z_extract_all
# ---------------------------------------------------------------------------
@pytest.mark.skipif(not _HAS_PY7ZR, reason="py7zr not installed")
class TestSevenZipExtractAll:
@staticmethod
def _make_7z(entries: dict[str, bytes]) -> io.BytesIO:
import py7zr
buf = io.BytesIO()
with py7zr.SevenZipFile(buf, "w") as sz:
for name, content in entries.items():
sz.writef(io.BytesIO(content), name)
buf.seek(0)
return buf
def test_basic_extraction_no_wrapper(self, tmp_path: Path) -> None:
buf = self._make_7z({"a.txt": b"aaa", "b.txt": b"bbb"})
out = tmp_path / "out"
out.mkdir()
_7z_extract_all(buf, out)
assert (out / "a.txt").exists()
assert (out / "b.txt").exists()
def test_strips_wrapper_directory(self, tmp_path: Path) -> None:
buf = self._make_7z({"wrapper/file.txt": b"data"})
out = tmp_path / "out"
out.mkdir()
_7z_extract_all(buf, out)
assert (out / "file.txt").exists()
assert not (out / "wrapper").exists()
def test_staging_suffix_collision(self, tmp_path: Path) -> None:
"""When .extract_tmp_0 already exists, suffix is incremented to find a free slot."""
out = tmp_path / "out"
out.mkdir()
(out / ".extract_tmp_0").mkdir()
buf = self._make_7z({"file.txt": b"hi"})
_7z_extract_all(buf, out)
assert (out / "file.txt").exists()
# .extract_tmp_1 should be cleaned up after extraction
assert not (out / ".extract_tmp_1").exists()
def test_overwrites_existing_directory(self, tmp_path: Path) -> None:
"""Pre-existing destination directory is replaced."""
out = tmp_path / "out"
out.mkdir()
existing_dir = out / "file.txt"
existing_dir.mkdir()
buf = self._make_7z({"file.txt": b"new"})
_7z_extract_all(buf, out)
assert (out / "file.txt").is_file()
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
"""Pre-existing destination file is replaced."""
out = tmp_path / "out"
out.mkdir()
(out / "file.txt").write_bytes(b"old")
buf = self._make_7z({"file.txt": b"new"})
_7z_extract_all(buf, out)
assert (out / "file.txt").exists()
def test_empty_name_skipped(self, tmp_path: Path) -> None:
"""Archive entries with empty names are silently skipped."""
import py7zr
buf = self._make_7z({"file.txt": b"data"})
out = tmp_path / "out"
out.mkdir()
with patch.object(
py7zr.SevenZipFile, "getnames", return_value=["", "file.txt"]
):
_7z_extract_all(buf, out)
assert (out / "file.txt").exists()
def test_path_traversal_skipped(self, tmp_path: Path) -> None:
"""Entries whose resolved path exits extract_dir are skipped."""
import py7zr
buf = self._make_7z({"file.txt": b"safe"})
out = tmp_path / "out"
out.mkdir()
with patch.object(
py7zr.SevenZipFile, "getnames", return_value=["../escape.txt", "file.txt"]
):
_7z_extract_all(buf, out)
assert not (tmp_path / "escape.txt").exists()
assert (out / "file.txt").exists()
def test_progress_bar_shown(self, tmp_path: Path) -> None:
buf = self._make_7z({"file.txt": b"x"})
out = tmp_path / "out"
out.mkdir()
with patch("esphome.framework_helpers.ProgressBar") as mock_pb:
_7z_extract_all(buf, out, progress_header="Unpacking 7z")
mock_pb.assert_called_once_with("Unpacking 7z")
mock_pb.return_value.update.assert_called()
def test_absolute_path_in_names_skipped(self, tmp_path: Path) -> None:
"""Names that resolve as absolute are silently skipped."""
import py7zr
buf = self._make_7z({"file.txt": b"safe"})
out = tmp_path / "out"
out.mkdir()
original_is_absolute = Path.is_absolute
def patched_is_absolute(self: Path) -> bool:
if str(self).startswith("C:"):
return True
return original_is_absolute(self)
with (
patch.object(
py7zr.SevenZipFile, "getnames", return_value=["C:/evil.txt", "file.txt"]
),
patch.object(Path, "is_absolute", patched_is_absolute),
):
_7z_extract_all(buf, out)
# Avoid `out / "C:"` here: pathlib treats "C:" as a drive (always
# "exists" on Windows). Assert on the actual extracted files instead.
extracted = sorted(p.name for p in out.rglob("*") if p.is_file())
assert extracted == ["file.txt"]
def test_dispatched_via_archive_extract_all(self, tmp_path: Path) -> None:
"""archive_extract_all dispatches 7z archives to _7z_extract_all."""
buf = self._make_7z({"hello.txt": b"world"})
data = buf.read()
assert data[:6] == b"\x37\x7a\xbc\xaf\x27\x1c"
archive = tmp_path / "test.7z"
archive.write_bytes(data)
out = tmp_path / "out"
out.mkdir()
archive_extract_all(archive, out)
assert (out / "hello.txt").exists()

View File

@@ -0,0 +1,219 @@
"""Tests for esphome.components.nrf52.framework helpers."""
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from esphome.components.nrf52.framework import (
_TOOLCHAIN_VERSION,
_get_toolchain_platform_info,
check_and_install,
)
from esphome.config_validation import Version
from esphome.const import KEY_CORE, KEY_FRAMEWORK_VERSION
from esphome.core import CORE, EsphomeError
@pytest.mark.parametrize(
("system", "machine", "expected"),
[
# default — no branch hit
("Linux", "x86_64", ("linux", "x86_64", "tar.xz")),
# arm64 → aarch64 rename
("Linux", "arm64", ("linux", "aarch64", "tar.xz")),
# darwin → macos rename only
("Darwin", "x86_64", ("macos", "x86_64", "tar.xz")),
# both renames apply
("Darwin", "arm64", ("macos", "aarch64", "tar.xz")),
# windows forces x86_64 + 7z; arm64 rename is overwritten
("Windows", "arm64", ("windows", "x86_64", "7z")),
],
)
def test_get_toolchain_platform_info(
system: str, machine: str, expected: tuple[str, str, str]
) -> None:
with (
patch("platform.system", return_value=system),
patch("platform.machine", return_value=machine),
):
assert _get_toolchain_platform_info() == expected
# ---------------------------------------------------------------------------
# Helpers and fixtures for check_and_install tests
# ---------------------------------------------------------------------------
_TEST_SDK_VERSION = "2.9.0"
@pytest.fixture
def nrf52_dirs(setup_core: Path) -> SimpleNamespace:
"""Populate CORE and pre-create SDK directories so sentinel.touch() succeeds."""
CORE.data[KEY_CORE] = {KEY_FRAMEWORK_VERSION: Version.parse(_TEST_SDK_VERSION)}
tools = CORE.data_dir / "sdk-nrf"
python_env = tools / "penvs" / f"v{_TEST_SDK_VERSION}"
framework = tools / "frameworks" / f"v{_TEST_SDK_VERSION}"
toolchain_dir = tools / "toolchains" / _TOOLCHAIN_VERSION
for d in (python_env, framework, toolchain_dir):
d.mkdir(parents=True, exist_ok=True)
return SimpleNamespace(
python_env=python_env,
framework=framework,
toolchain=toolchain_dir,
)
@pytest.fixture
def mock_nrf52_ops():
"""Patch all heavy I/O operations used by check_and_install."""
with (
patch("esphome.components.nrf52.framework.rmdir") as mock_rmdir,
patch("esphome.components.nrf52.framework.create_venv") as mock_create_venv,
patch(
"esphome.components.nrf52.framework.run_command_ok", return_value=True
) as mock_run_cmd,
patch(
"esphome.components.nrf52.framework.download_from_mirrors",
return_value="https://example.com/tc.tar.xz",
) as mock_download,
patch("esphome.components.nrf52.framework.archive_extract_all") as mock_extract,
):
yield SimpleNamespace(
rmdir=mock_rmdir,
create_venv=mock_create_venv,
run_command_ok=mock_run_cmd,
download_from_mirrors=mock_download,
archive_extract_all=mock_extract,
)
# ---------------------------------------------------------------------------
# check_and_install tests
# ---------------------------------------------------------------------------
class TestCheckAndInstall:
def test_all_installed_skips_all_steps(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""All three sentinels present → nothing downloaded or compiled."""
(nrf52_dirs.python_env / ".ready").touch()
(nrf52_dirs.framework / ".ready").touch()
(nrf52_dirs.toolchain / ".ready").touch()
check_and_install()
mock_nrf52_ops.create_venv.assert_not_called()
mock_nrf52_ops.run_command_ok.assert_not_called()
mock_nrf52_ops.download_from_mirrors.assert_not_called()
mock_nrf52_ops.archive_extract_all.assert_not_called()
def test_fresh_install_runs_all_steps(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""No sentinels → venv created, west installed, SDK init+update, toolchain downloaded."""
check_and_install()
mock_nrf52_ops.create_venv.assert_called_once()
# pip install west, west init, west update
assert mock_nrf52_ops.run_command_ok.call_count == 3
mock_nrf52_ops.download_from_mirrors.assert_called_once()
mock_nrf52_ops.archive_extract_all.assert_called_once()
assert (nrf52_dirs.python_env / ".ready").exists()
assert (nrf52_dirs.framework / ".ready").exists()
assert (nrf52_dirs.toolchain / ".ready").exists()
def test_venv_exists_installs_framework_and_toolchain(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""Venv ready but framework missing → skip venv creation, run SDK init+update."""
(nrf52_dirs.python_env / ".ready").touch()
check_and_install()
mock_nrf52_ops.create_venv.assert_not_called()
# west init + west update only (no pip install)
assert mock_nrf52_ops.run_command_ok.call_count == 2
mock_nrf52_ops.download_from_mirrors.assert_called_once()
def test_toolchain_only_missing(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""Venv and framework ready → only toolchain downloaded and extracted."""
(nrf52_dirs.python_env / ".ready").touch()
(nrf52_dirs.framework / ".ready").touch()
check_and_install()
mock_nrf52_ops.create_venv.assert_not_called()
mock_nrf52_ops.run_command_ok.assert_not_called()
mock_nrf52_ops.download_from_mirrors.assert_called_once()
mock_nrf52_ops.archive_extract_all.assert_called_once()
def test_west_install_failure_raises(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""Failing pip install west raises EsphomeError."""
mock_nrf52_ops.run_command_ok.return_value = False
with pytest.raises(EsphomeError, match="Install west"):
check_and_install()
def test_framework_init_failure_raises(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""Failing west init raises EsphomeError."""
(nrf52_dirs.python_env / ".ready").touch()
mock_nrf52_ops.run_command_ok.return_value = False
with pytest.raises(EsphomeError, match="Can't initialize"):
check_and_install()
def test_framework_update_failure_raises(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""Failing west update raises EsphomeError."""
(nrf52_dirs.python_env / ".ready").touch()
# init succeeds, update fails
mock_nrf52_ops.run_command_ok.side_effect = [True, False]
with pytest.raises(EsphomeError, match="Can't update"):
check_and_install()
def test_toolchain_download_passes_platform_substitutions(
self,
nrf52_dirs: SimpleNamespace,
mock_nrf52_ops: SimpleNamespace,
) -> None:
"""download_from_mirrors receives VERSION + platform triple from _get_toolchain_platform_info."""
(nrf52_dirs.python_env / ".ready").touch()
(nrf52_dirs.framework / ".ready").touch()
with patch(
"esphome.components.nrf52.framework._get_toolchain_platform_info",
return_value=("linux", "x86_64", "tar.xz"),
):
check_and_install()
args, _ = mock_nrf52_ops.download_from_mirrors.call_args
substitutions = args[1]
assert substitutions["VERSION"] == _TOOLCHAIN_VERSION
assert substitutions["sysname"] == "linux"
assert substitutions["machine"] == "x86_64"
assert substitutions["extension"] == "tar.xz"

View File

@@ -442,6 +442,21 @@ def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> Non
mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4")
def test_run_compile_without_process_limit(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""When no compile_process_limit is set, run_compile passes no -j flag."""
from esphome.const import CONF_ESPHOME
CORE.build_path = str(setup_core / "build" / "test")
config = {CONF_ESPHOME: {}}
mock_run_platformio_cli_run.return_value = 0
toolchain.run_compile(config, verbose=False)
mock_run_platformio_cli_run.assert_called_once_with(config, False)
def test_get_idedata_caches_result(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None: