[ci] Add import-time regression check for esphome.__main__ (#15954)

This commit is contained in:
J. Nick Koston
2026-04-28 09:05:12 -05:00
committed by GitHub
parent 0759a3c681
commit 0a4d9b430f
7 changed files with 569 additions and 0 deletions

View File

@@ -0,0 +1,191 @@
"""Unit tests for script/check_import_time.py."""
from __future__ import annotations
import importlib.util
import json
import os
from pathlib import Path
import sys
from unittest.mock import patch
import pytest
# Load the script-under-test as `check_import_time` (it's a hyphenated path
# inside `script/` that mirrors the existing `determine_jobs` pattern).
script_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "script")
)
sys.path.insert(0, script_dir)
spec = importlib.util.spec_from_file_location(
"check_import_time", os.path.join(script_dir, "check_import_time.py")
)
check_import_time = importlib.util.module_from_spec(spec)
spec.loader.exec_module(check_import_time)
def _entry(name: str, self_us: int, cumulative_us: int) -> dict:
"""Build a minimal HAR entry matching `importtime_waterfall --har`."""
return {
"request": {"url": name},
"time": cumulative_us,
"timings": {"receive": self_us, "wait": cumulative_us - self_us},
}
def _har(*entries: dict) -> dict:
return {"log": {"entries": list(entries)}}
def test_root_cumulative_us_returns_time_for_root_module() -> None:
har = _har(
_entry("dep_a", 500, 500),
_entry("dep_b", 300, 300),
_entry("esphome.__main__", 100, 1000),
)
assert check_import_time.root_cumulative_us(har, "esphome.__main__") == 1000
def test_root_cumulative_us_missing_module_raises() -> None:
har = _har(_entry("something.else", 100, 100))
with pytest.raises(RuntimeError, match="No HAR entry for 'esphome.__main__'"):
check_import_time.root_cumulative_us(har, "esphome.__main__")
def test_top_offenders_ranks_by_self_time_descending() -> None:
har = _har(
_entry("small", 100, 100),
_entry("big", 5000, 5000),
_entry("medium", 2000, 2500),
)
result = check_import_time.top_offenders(har, n=10)
assert [name for name, _, _ in result] == ["big", "medium", "small"]
assert result[0] == ("big", 5000, 5000)
def test_top_offenders_respects_n_limit() -> None:
har = _har(*[_entry(f"m{i}", i * 100, i * 100) for i in range(1, 20)])
assert len(check_import_time.top_offenders(har, n=5)) == 5
def test_top_offenders_dedupes_repeat_names_keeping_first() -> None:
har = _har(
_entry("pkg", 5000, 5000),
_entry("pkg", 100, 100), # reimport later in trace
_entry("other", 1000, 1000),
)
result = check_import_time.top_offenders(har, n=10)
assert [name for name, _, _ in result] == ["pkg", "other"]
# First occurrence wins
assert ("pkg", 5000, 5000) in result
def test_format_us_switches_to_ms_at_threshold() -> None:
assert check_import_time._format_us(500) == "500us"
assert check_import_time._format_us(999) == "999us"
assert check_import_time._format_us(1000) == "1.0ms"
assert check_import_time._format_us(12345) == "12.3ms"
def test_read_write_budget_roundtrip(tmp_path: Path) -> None:
budget_path = tmp_path / "budget.json"
with patch.object(check_import_time, "BUDGET_PATH", budget_path):
assert check_import_time.read_budget() == {}
check_import_time.write_budget(cumulative_us=12345, margin_pct=20)
loaded = check_import_time.read_budget()
assert loaded["cumulative_us"] == 12345
assert loaded["margin_pct"] == 20
assert loaded["target_module"] == check_import_time.TARGET_MODULE
def test_cmd_check_passes_when_measured_within_ceiling(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
budget_path = tmp_path / "budget.json"
budget_path.write_text(
json.dumps(
{
"target_module": check_import_time.TARGET_MODULE,
"margin_pct": 15,
"cumulative_us": 100000, # 100ms
}
)
)
# Measured 90ms: inside 100ms + 15% = 115ms ceiling
har = _har(_entry(check_import_time.TARGET_MODULE, 1000, 90000))
args = type("A", (), {"har": None})()
with (
patch.object(check_import_time, "BUDGET_PATH", budget_path),
patch.object(check_import_time, "measure", return_value=har),
):
rc = check_import_time.cmd_check(args)
assert rc == 0
out = capsys.readouterr().out
assert "measured esphome.__main__:" in out
assert "budget 100.0ms" in out
def test_cmd_check_fails_when_measured_exceeds_ceiling(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
budget_path = tmp_path / "budget.json"
budget_path.write_text(
json.dumps(
{
"target_module": check_import_time.TARGET_MODULE,
"margin_pct": 15,
"cumulative_us": 100000,
}
)
)
# Measured 120ms: over 100ms + 15% = 115ms ceiling
har = _har(
_entry("offender_a", 10000, 10000),
_entry(check_import_time.TARGET_MODULE, 1000, 120000),
)
args = type("A", (), {"har": None})()
with (
patch.object(check_import_time, "BUDGET_PATH", budget_path),
patch.object(check_import_time, "measure", return_value=har),
):
rc = check_import_time.cmd_check(args)
assert rc == 1
err = capsys.readouterr().err
assert "REGRESSION" in err
assert "120.0ms" in err
assert "offender_a" in err # top offender table
def test_cmd_check_returns_2_when_budget_missing(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
budget_path = tmp_path / "nonexistent.json"
args = type("A", (), {"har": None})()
with patch.object(check_import_time, "BUDGET_PATH", budget_path):
rc = check_import_time.cmd_check(args)
assert rc == 2
assert "missing" in capsys.readouterr().err
def test_cmd_check_writes_har_when_path_given(tmp_path: Path) -> None:
budget_path = tmp_path / "budget.json"
budget_path.write_text(
json.dumps(
{
"target_module": check_import_time.TARGET_MODULE,
"margin_pct": 15,
"cumulative_us": 100000,
}
)
)
har_path = tmp_path / "out.har"
har_text = json.dumps(_har(_entry(check_import_time.TARGET_MODULE, 1000, 80000)))
args = type("A", (), {"har": str(har_path)})()
with (
patch.object(check_import_time, "BUDGET_PATH", budget_path),
patch.object(check_import_time, "run_waterfall", return_value=har_text),
):
rc = check_import_time.cmd_check(args)
assert rc == 0
assert har_path.exists()
assert json.loads(har_path.read_text()) == json.loads(har_text)

View File

@@ -56,6 +56,13 @@ def mock_should_run_python_linters() -> Generator[Mock, None, None]:
yield mock
@pytest.fixture
def mock_should_run_import_time() -> Generator[Mock, None, None]:
"""Mock should_run_import_time from determine_jobs."""
with patch.object(determine_jobs, "should_run_import_time") as mock:
yield mock
@pytest.fixture
def mock_determine_cpp_unit_tests() -> Generator[Mock, None, None]:
"""Mock determine_cpp_unit_tests from helpers."""
@@ -91,6 +98,7 @@ def test_main_all_tests_should_run(
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_should_run_import_time: Mock,
mock_changed_files: Mock,
mock_determine_cpp_unit_tests: Mock,
capsys: pytest.CaptureFixture[str],
@@ -104,6 +112,7 @@ def test_main_all_tests_should_run(
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = True
mock_should_run_python_linters.return_value = True
mock_should_run_import_time.return_value = True
mock_determine_cpp_unit_tests.return_value = (False, ["wifi", "api", "sensor"])
# Mock changed_files to return non-component files (to avoid memory impact)
@@ -158,6 +167,7 @@ def test_main_all_tests_should_run(
assert output["clang_tidy_mode"] in ["nosplit", "split"]
assert output["clang_format"] is True
assert output["python_linters"] is True
assert output["import_time"] is True
assert output["changed_components"] == ["wifi", "api", "sensor"]
# changed_components_with_tests will only include components that actually have test files
assert "changed_components_with_tests" in output
@@ -189,6 +199,7 @@ def test_main_no_tests_should_run(
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_should_run_import_time: Mock,
mock_changed_files: Mock,
mock_determine_cpp_unit_tests: Mock,
capsys: pytest.CaptureFixture[str],
@@ -202,6 +213,7 @@ def test_main_no_tests_should_run(
mock_should_run_clang_tidy.return_value = False
mock_should_run_clang_format.return_value = False
mock_should_run_python_linters.return_value = False
mock_should_run_import_time.return_value = False
mock_determine_cpp_unit_tests.return_value = (False, [])
# Mock changed_files to return no component files
@@ -241,6 +253,7 @@ def test_main_no_tests_should_run(
assert output["clang_tidy_mode"] == "disabled"
assert output["clang_format"] is False
assert output["python_linters"] is False
assert output["import_time"] is False
assert output["changed_components"] == []
assert output["changed_components_with_tests"] == []
assert output["component_test_count"] == 0
@@ -261,6 +274,7 @@ def test_main_with_branch_argument(
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_should_run_import_time: Mock,
mock_changed_files: Mock,
mock_determine_cpp_unit_tests: Mock,
capsys: pytest.CaptureFixture[str],
@@ -274,6 +288,7 @@ def test_main_with_branch_argument(
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = False
mock_should_run_python_linters.return_value = True
mock_should_run_import_time.return_value = True
mock_determine_cpp_unit_tests.return_value = (False, ["mqtt"])
# Mock changed_files to return non-component files (to avoid memory impact)
@@ -310,6 +325,7 @@ def test_main_with_branch_argument(
mock_should_run_clang_tidy.assert_called_once_with("main")
mock_should_run_clang_format.assert_called_once_with("main")
mock_should_run_python_linters.assert_called_once_with("main")
mock_should_run_import_time.assert_called_once_with("main")
# Check output
captured = capsys.readouterr()
@@ -322,6 +338,7 @@ def test_main_with_branch_argument(
assert output["clang_tidy_mode"] in ["nosplit", "split"]
assert output["clang_format"] is False
assert output["python_linters"] is True
assert output["import_time"] is True
assert output["changed_components"] == ["mqtt"]
# changed_components_with_tests will only include components that actually have test files
assert "changed_components_with_tests" in output
@@ -597,6 +614,50 @@ def test_should_run_python_linters_with_branch() -> None:
mock_changed.assert_called_once_with("release")
@pytest.mark.parametrize(
("changed_files", "expected_result"),
[
# esphome Python files trigger the check
(["esphome/__main__.py"], True),
(["esphome/components/wifi/__init__.py"], True),
(["esphome/core/config.py"], True),
(["esphome/types.pyi"], True),
# Dependency declarations and the check's own files trigger
(["requirements.txt"], True),
(["requirements_dev.txt"], True),
(["requirements_test.txt"], True),
(["pyproject.toml"], True),
(["script/check_import_time.py"], True),
(["script/import_time_budget.json"], True),
# Mixed: any triggering file is enough
(["docs/README.md", "esphome/config.py"], True),
# Python files outside esphome/ don't trigger
(["script/some_other_script.py"], False),
(["tests/script/test_determine_jobs.py"], False),
# Non-Python changes don't trigger
(["esphome/core/component.cpp"], False),
(["tests/components/wifi/test.esp32-idf.yaml"], False),
(["README.md"], False),
([], False),
],
)
def test_should_run_import_time(
changed_files: list[str], expected_result: bool
) -> None:
"""Test should_run_import_time function."""
with patch.object(determine_jobs, "changed_files", return_value=changed_files):
result = determine_jobs.should_run_import_time()
assert result == expected_result
def test_should_run_import_time_with_branch() -> None:
"""Test should_run_import_time with branch argument."""
with patch.object(determine_jobs, "changed_files") as mock_changed:
mock_changed.return_value = []
determine_jobs.should_run_import_time("release")
mock_changed.assert_called_once_with("release")
@pytest.mark.parametrize(
("changed_files", "expected_result"),
[