[ci] Extract integration test bucketing into a pure function

Pull the run_all glob expansion + bucket computation out of main() into
_compute_integration_test_buckets() returning (run_integration, buckets).
The boundary tests now call this helper directly instead of driving
main() through ~14 patched dependencies, which both shrinks the test
helper and removes duplication with the existing test_main_* fixtures.
This commit is contained in:
J. Nick Koston
2026-04-29 18:41:06 -05:00
parent 37bcd7c59f
commit f3b75c0369
2 changed files with 68 additions and 132 deletions

View File

@@ -100,6 +100,43 @@ def _all_integration_test_files() -> list[str]:
)
def _compute_integration_test_buckets(
integration_run_all: bool,
integration_test_files: list[str],
) -> tuple[bool, list[dict[str, Any]]]:
"""Compute (run_integration, buckets) from the determine_integration_tests result.
Pure function for unit testing — no I/O beyond `_all_integration_test_files`
when `integration_run_all` is set.
`buckets` is a list of `{name, tests}` dicts where `tests` is a JSON-friendly
list of file paths so the workflow can build a bash array via jq, avoiding
shell word-splitting / glob hazards.
"""
if integration_run_all:
files = _all_integration_test_files()
else:
files = sorted(integration_test_files)
# Empty list (e.g. run_all expansion with no files on disk) would otherwise
# cause the workflow to invoke pytest with no path argument and collect
# tests outside tests/integration/. Suppress the run instead.
if not files:
return False, []
if len(files) > INTEGRATION_TESTS_SPLIT_THRESHOLD:
parts = [
part for part in _split_list(files, INTEGRATION_TESTS_SPLIT_BUCKETS) if part
]
buckets = [
{"name": f"{i + 1}/{len(parts)}", "tests": part}
for i, part in enumerate(parts)
]
else:
buckets = [{"name": "1/1", "tests": files}]
return True, buckets
class Platform(StrEnum):
"""Platform identifiers for memory impact analysis."""
@@ -830,43 +867,9 @@ def main() -> None:
integration_run_all, integration_test_files = determine_integration_tests(
args.branch
)
run_integration = integration_run_all or bool(integration_test_files)
# When run_all is set, expand to the full glob here so determine-jobs.py
# remains the single source of truth for which tests run. The workflow
# never re-globs the filesystem.
if integration_run_all:
integration_test_files = _all_integration_test_files()
else:
integration_test_files = sorted(integration_test_files)
# Guard: if expansion produced no files (shouldn't happen normally, but
# would cause the workflow to invoke pytest with no path argument and
# collect tests outside tests/integration/), treat as no integration run.
if not integration_test_files:
run_integration = False
# Pre-bucket the test list so the CI matrix can consume it directly.
# `tests` is a JSON list of file paths so the workflow can build a bash
# array via jq, avoiding shell word-splitting / glob hazards.
# Below threshold => 1 bucket; above threshold => INTEGRATION_TESTS_SPLIT_BUCKETS.
integration_test_buckets: list[dict[str, Any]]
if not run_integration:
integration_test_buckets = []
elif len(integration_test_files) > INTEGRATION_TESTS_SPLIT_THRESHOLD:
parts = [
part
for part in _split_list(
integration_test_files, INTEGRATION_TESTS_SPLIT_BUCKETS
)
if part
]
integration_test_buckets = [
{"name": f"{i + 1}/{len(parts)}", "tests": part}
for i, part in enumerate(parts)
]
else:
integration_test_buckets = [{"name": "1/1", "tests": integration_test_files}]
run_integration, integration_test_buckets = _compute_integration_test_buckets(
integration_run_all, integration_test_files
)
run_clang_tidy = should_run_clang_tidy(args.branch)
run_clang_format = should_run_clang_format(args.branch)
run_python_linters = should_run_python_linters(args.branch)

View File

@@ -1,9 +1,7 @@
"""Unit tests for script/determine-jobs.py module."""
from collections.abc import Generator
import contextlib
import importlib.util
import io
import json
import os
from pathlib import Path
@@ -382,106 +380,41 @@ def test_main_with_branch_argument(
assert output["cpp_unit_tests_components"] == ["mqtt"]
def _run_main_for_integration_buckets(
*,
integration_return: tuple[bool, list[str]],
glob_files: list[str] | None = None,
) -> dict:
"""Drive determine_jobs.main() with stubbed inputs and return its parsed output.
Helper for testing integration_test_buckets in isolation. Bypasses every
unrelated branch (clang-tidy, components, memory, cpp tests, batches) so
the test focuses purely on the bucketing decision logic.
"""
patches: list = [
patch.object(
determine_jobs,
"determine_integration_tests",
return_value=integration_return,
),
patch.object(determine_jobs, "should_run_clang_tidy", return_value=False),
patch.object(determine_jobs, "should_run_clang_format", return_value=False),
patch.object(determine_jobs, "should_run_python_linters", return_value=False),
patch.object(determine_jobs, "should_run_import_time", return_value=False),
patch.object(determine_jobs, "count_changed_cpp_files", return_value=0),
patch.object(determine_jobs, "changed_files", return_value=[]),
patch.object(determine_jobs, "get_changed_components", return_value=[]),
patch.object(
determine_jobs, "get_components_with_dependencies", return_value=[]
),
patch.object(
determine_jobs,
"detect_memory_impact_config",
return_value={"should_run": "false"},
),
patch.object(
determine_jobs, "create_intelligent_batches", return_value=([], {})
),
patch.object(
determine_jobs, "determine_cpp_unit_tests", return_value=(False, [])
),
patch.object(determine_jobs, "should_run_benchmarks", return_value=False),
patch.object(determine_jobs, "get_target_branch", return_value="dev"),
patch("sys.argv", ["determine-jobs.py"]),
]
if glob_files is not None:
patches.append(
patch.object(
determine_jobs,
"_all_integration_test_files",
return_value=glob_files,
)
)
buf = io.StringIO()
with contextlib.ExitStack() as stack:
for p in patches:
stack.enter_context(p)
with contextlib.redirect_stdout(buf):
determine_jobs.main()
return json.loads(buf.getvalue())
def test_compute_integration_test_buckets_empty() -> None:
"""No integration tests scheduled => (False, [])."""
run, buckets = determine_jobs._compute_integration_test_buckets(False, [])
assert run is False
assert buckets == []
def test_integration_buckets_empty_when_no_tests() -> None:
"""No integration tests scheduled => integration_test_buckets is []."""
output = _run_main_for_integration_buckets(integration_return=(False, []))
assert output["integration_tests"] is False
assert output["integration_test_buckets"] == []
def test_compute_integration_test_buckets_below_threshold() -> None:
"""A small explicit list (<= threshold) => single 1/1 bucket with that list."""
files = [f"tests/integration/test_{name}.py" for name in ("c", "a", "b")]
run, buckets = determine_jobs._compute_integration_test_buckets(False, files)
assert run is True
assert buckets == [{"name": "1/1", "tests": sorted(files)}]
def test_integration_buckets_specific_files_below_threshold() -> None:
"""A small explicit list (<= threshold) produces a single 1/1 bucket
containing exactly that list as a JSON array."""
files = [f"tests/integration/test_{name}.py" for name in ("a", "b", "c", "d", "e")]
output = _run_main_for_integration_buckets(integration_return=(False, files))
assert output["integration_tests"] is True
buckets = output["integration_test_buckets"]
assert len(buckets) == 1
assert buckets[0]["name"] == "1/1"
assert buckets[0]["tests"] == sorted(files)
def test_integration_buckets_at_threshold_stays_single() -> None:
"""Exactly INTEGRATION_TESTS_SPLIT_THRESHOLD files stays as one bucket
def test_compute_integration_test_buckets_at_threshold_stays_single() -> None:
"""Exactly INTEGRATION_TESTS_SPLIT_THRESHOLD files => still one bucket
(the split kicks in only when count is strictly greater than threshold)."""
files = [
f"tests/integration/test_{i:02d}.py"
for i in range(determine_jobs.INTEGRATION_TESTS_SPLIT_THRESHOLD)
]
output = _run_main_for_integration_buckets(integration_return=(False, files))
buckets = output["integration_test_buckets"]
run, buckets = determine_jobs._compute_integration_test_buckets(False, files)
assert run is True
assert len(buckets) == 1
assert buckets[0]["name"] == "1/1"
assert buckets[0]["tests"] == sorted(files)
def test_integration_buckets_just_over_threshold_splits() -> None:
"""One file over the threshold triggers the 3-bucket fan-out."""
def test_compute_integration_test_buckets_just_over_threshold_splits() -> None:
"""One file over the threshold triggers the 3-bucket fan-out, balanced."""
n = determine_jobs.INTEGRATION_TESTS_SPLIT_THRESHOLD + 1
files = [f"tests/integration/test_{i:02d}.py" for i in range(n)]
output = _run_main_for_integration_buckets(integration_return=(False, files))
buckets = output["integration_test_buckets"]
assert len(buckets) == determine_jobs.INTEGRATION_TESTS_SPLIT_BUCKETS
run, buckets = determine_jobs._compute_integration_test_buckets(False, files)
assert run is True
assert [b["name"] for b in buckets] == ["1/3", "2/3", "3/3"]
union = [path for b in buckets for path in b["tests"]]
assert union == sorted(files)
@@ -489,15 +422,15 @@ def test_integration_buckets_just_over_threshold_splits() -> None:
assert max(sizes) - min(sizes) <= 1
def test_integration_buckets_run_all_with_empty_glob_disables_run() -> None:
"""If run_all=True but the glob unexpectedly returns no files, the run is
suppressed (otherwise pytest would be invoked with no path argument and
collect tests outside tests/integration/)."""
output = _run_main_for_integration_buckets(
integration_return=(True, []), glob_files=[]
)
assert output["integration_tests"] is False
assert output["integration_test_buckets"] == []
def test_compute_integration_test_buckets_run_all_with_empty_glob_disables_run() -> (
None
):
"""run_all=True but glob returns no files => run suppressed (otherwise
pytest would collect tests outside tests/integration/)."""
with patch.object(determine_jobs, "_all_integration_test_files", return_value=[]):
run, buckets = determine_jobs._compute_integration_test_buckets(True, [])
assert run is False
assert buckets == []
def test_determine_integration_tests(