From f3b75c03696daec84f4a28de2941cca034f1bda3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 29 Apr 2026 18:41:06 -0500 Subject: [PATCH] [ci] Extract integration test bucketing into a pure function Pull the run_all glob expansion + bucket computation out of main() into _compute_integration_test_buckets() returning (run_integration, buckets). The boundary tests now call this helper directly instead of driving main() through ~14 patched dependencies, which both shrinks the test helper and removes duplication with the existing test_main_* fixtures. --- script/determine-jobs.py | 77 ++++++++--------- tests/script/test_determine_jobs.py | 123 +++++++--------------------- 2 files changed, 68 insertions(+), 132 deletions(-) diff --git a/script/determine-jobs.py b/script/determine-jobs.py index 21f05c975b..c0cf8ecbdc 100755 --- a/script/determine-jobs.py +++ b/script/determine-jobs.py @@ -100,6 +100,43 @@ def _all_integration_test_files() -> list[str]: ) +def _compute_integration_test_buckets( + integration_run_all: bool, + integration_test_files: list[str], +) -> tuple[bool, list[dict[str, Any]]]: + """Compute (run_integration, buckets) from the determine_integration_tests result. + + Pure function for unit testing — no I/O beyond `_all_integration_test_files` + when `integration_run_all` is set. + + `buckets` is a list of `{name, tests}` dicts where `tests` is a JSON-friendly + list of file paths so the workflow can build a bash array via jq, avoiding + shell word-splitting / glob hazards. + """ + if integration_run_all: + files = _all_integration_test_files() + else: + files = sorted(integration_test_files) + + # Empty list (e.g. run_all expansion with no files on disk) would otherwise + # cause the workflow to invoke pytest with no path argument and collect + # tests outside tests/integration/. Suppress the run instead. + if not files: + return False, [] + + if len(files) > INTEGRATION_TESTS_SPLIT_THRESHOLD: + parts = [ + part for part in _split_list(files, INTEGRATION_TESTS_SPLIT_BUCKETS) if part + ] + buckets = [ + {"name": f"{i + 1}/{len(parts)}", "tests": part} + for i, part in enumerate(parts) + ] + else: + buckets = [{"name": "1/1", "tests": files}] + return True, buckets + + class Platform(StrEnum): """Platform identifiers for memory impact analysis.""" @@ -830,43 +867,9 @@ def main() -> None: integration_run_all, integration_test_files = determine_integration_tests( args.branch ) - run_integration = integration_run_all or bool(integration_test_files) - - # When run_all is set, expand to the full glob here so determine-jobs.py - # remains the single source of truth for which tests run. The workflow - # never re-globs the filesystem. - if integration_run_all: - integration_test_files = _all_integration_test_files() - else: - integration_test_files = sorted(integration_test_files) - - # Guard: if expansion produced no files (shouldn't happen normally, but - # would cause the workflow to invoke pytest with no path argument and - # collect tests outside tests/integration/), treat as no integration run. - if not integration_test_files: - run_integration = False - - # Pre-bucket the test list so the CI matrix can consume it directly. - # `tests` is a JSON list of file paths so the workflow can build a bash - # array via jq, avoiding shell word-splitting / glob hazards. - # Below threshold => 1 bucket; above threshold => INTEGRATION_TESTS_SPLIT_BUCKETS. - integration_test_buckets: list[dict[str, Any]] - if not run_integration: - integration_test_buckets = [] - elif len(integration_test_files) > INTEGRATION_TESTS_SPLIT_THRESHOLD: - parts = [ - part - for part in _split_list( - integration_test_files, INTEGRATION_TESTS_SPLIT_BUCKETS - ) - if part - ] - integration_test_buckets = [ - {"name": f"{i + 1}/{len(parts)}", "tests": part} - for i, part in enumerate(parts) - ] - else: - integration_test_buckets = [{"name": "1/1", "tests": integration_test_files}] + run_integration, integration_test_buckets = _compute_integration_test_buckets( + integration_run_all, integration_test_files + ) run_clang_tidy = should_run_clang_tidy(args.branch) run_clang_format = should_run_clang_format(args.branch) run_python_linters = should_run_python_linters(args.branch) diff --git a/tests/script/test_determine_jobs.py b/tests/script/test_determine_jobs.py index 9523cdb3fe..e85f1757b0 100644 --- a/tests/script/test_determine_jobs.py +++ b/tests/script/test_determine_jobs.py @@ -1,9 +1,7 @@ """Unit tests for script/determine-jobs.py module.""" from collections.abc import Generator -import contextlib import importlib.util -import io import json import os from pathlib import Path @@ -382,106 +380,41 @@ def test_main_with_branch_argument( assert output["cpp_unit_tests_components"] == ["mqtt"] -def _run_main_for_integration_buckets( - *, - integration_return: tuple[bool, list[str]], - glob_files: list[str] | None = None, -) -> dict: - """Drive determine_jobs.main() with stubbed inputs and return its parsed output. - - Helper for testing integration_test_buckets in isolation. Bypasses every - unrelated branch (clang-tidy, components, memory, cpp tests, batches) so - the test focuses purely on the bucketing decision logic. - """ - patches: list = [ - patch.object( - determine_jobs, - "determine_integration_tests", - return_value=integration_return, - ), - patch.object(determine_jobs, "should_run_clang_tidy", return_value=False), - patch.object(determine_jobs, "should_run_clang_format", return_value=False), - patch.object(determine_jobs, "should_run_python_linters", return_value=False), - patch.object(determine_jobs, "should_run_import_time", return_value=False), - patch.object(determine_jobs, "count_changed_cpp_files", return_value=0), - patch.object(determine_jobs, "changed_files", return_value=[]), - patch.object(determine_jobs, "get_changed_components", return_value=[]), - patch.object( - determine_jobs, "get_components_with_dependencies", return_value=[] - ), - patch.object( - determine_jobs, - "detect_memory_impact_config", - return_value={"should_run": "false"}, - ), - patch.object( - determine_jobs, "create_intelligent_batches", return_value=([], {}) - ), - patch.object( - determine_jobs, "determine_cpp_unit_tests", return_value=(False, []) - ), - patch.object(determine_jobs, "should_run_benchmarks", return_value=False), - patch.object(determine_jobs, "get_target_branch", return_value="dev"), - patch("sys.argv", ["determine-jobs.py"]), - ] - if glob_files is not None: - patches.append( - patch.object( - determine_jobs, - "_all_integration_test_files", - return_value=glob_files, - ) - ) - - buf = io.StringIO() - with contextlib.ExitStack() as stack: - for p in patches: - stack.enter_context(p) - with contextlib.redirect_stdout(buf): - determine_jobs.main() - return json.loads(buf.getvalue()) +def test_compute_integration_test_buckets_empty() -> None: + """No integration tests scheduled => (False, []).""" + run, buckets = determine_jobs._compute_integration_test_buckets(False, []) + assert run is False + assert buckets == [] -def test_integration_buckets_empty_when_no_tests() -> None: - """No integration tests scheduled => integration_test_buckets is [].""" - output = _run_main_for_integration_buckets(integration_return=(False, [])) - assert output["integration_tests"] is False - assert output["integration_test_buckets"] == [] +def test_compute_integration_test_buckets_below_threshold() -> None: + """A small explicit list (<= threshold) => single 1/1 bucket with that list.""" + files = [f"tests/integration/test_{name}.py" for name in ("c", "a", "b")] + run, buckets = determine_jobs._compute_integration_test_buckets(False, files) + assert run is True + assert buckets == [{"name": "1/1", "tests": sorted(files)}] -def test_integration_buckets_specific_files_below_threshold() -> None: - """A small explicit list (<= threshold) produces a single 1/1 bucket - containing exactly that list as a JSON array.""" - files = [f"tests/integration/test_{name}.py" for name in ("a", "b", "c", "d", "e")] - output = _run_main_for_integration_buckets(integration_return=(False, files)) - assert output["integration_tests"] is True - buckets = output["integration_test_buckets"] - assert len(buckets) == 1 - assert buckets[0]["name"] == "1/1" - assert buckets[0]["tests"] == sorted(files) - - -def test_integration_buckets_at_threshold_stays_single() -> None: - """Exactly INTEGRATION_TESTS_SPLIT_THRESHOLD files stays as one bucket +def test_compute_integration_test_buckets_at_threshold_stays_single() -> None: + """Exactly INTEGRATION_TESTS_SPLIT_THRESHOLD files => still one bucket (the split kicks in only when count is strictly greater than threshold).""" files = [ f"tests/integration/test_{i:02d}.py" for i in range(determine_jobs.INTEGRATION_TESTS_SPLIT_THRESHOLD) ] - output = _run_main_for_integration_buckets(integration_return=(False, files)) - buckets = output["integration_test_buckets"] + run, buckets = determine_jobs._compute_integration_test_buckets(False, files) + assert run is True assert len(buckets) == 1 assert buckets[0]["name"] == "1/1" assert buckets[0]["tests"] == sorted(files) -def test_integration_buckets_just_over_threshold_splits() -> None: - """One file over the threshold triggers the 3-bucket fan-out.""" +def test_compute_integration_test_buckets_just_over_threshold_splits() -> None: + """One file over the threshold triggers the 3-bucket fan-out, balanced.""" n = determine_jobs.INTEGRATION_TESTS_SPLIT_THRESHOLD + 1 files = [f"tests/integration/test_{i:02d}.py" for i in range(n)] - output = _run_main_for_integration_buckets(integration_return=(False, files)) - buckets = output["integration_test_buckets"] - assert len(buckets) == determine_jobs.INTEGRATION_TESTS_SPLIT_BUCKETS + run, buckets = determine_jobs._compute_integration_test_buckets(False, files) + assert run is True assert [b["name"] for b in buckets] == ["1/3", "2/3", "3/3"] union = [path for b in buckets for path in b["tests"]] assert union == sorted(files) @@ -489,15 +422,15 @@ def test_integration_buckets_just_over_threshold_splits() -> None: assert max(sizes) - min(sizes) <= 1 -def test_integration_buckets_run_all_with_empty_glob_disables_run() -> None: - """If run_all=True but the glob unexpectedly returns no files, the run is - suppressed (otherwise pytest would be invoked with no path argument and - collect tests outside tests/integration/).""" - output = _run_main_for_integration_buckets( - integration_return=(True, []), glob_files=[] - ) - assert output["integration_tests"] is False - assert output["integration_test_buckets"] == [] +def test_compute_integration_test_buckets_run_all_with_empty_glob_disables_run() -> ( + None +): + """run_all=True but glob returns no files => run suppressed (otherwise + pytest would collect tests outside tests/integration/).""" + with patch.object(determine_jobs, "_all_integration_test_files", return_value=[]): + run, buckets = determine_jobs._compute_integration_test_buckets(True, []) + assert run is False + assert buckets == [] def test_determine_integration_tests(