From aef2d74e41123f77900bc5cab37a75cfb9b6e2b1 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 8 Mar 2026 14:32:59 -1000 Subject: [PATCH] [ld2450] Add integration tests with mock UART (#14611) --- .../fixtures/uart_mock_ld2450.yaml | 221 ++++++++++++++++++ tests/integration/state_utils.py | 28 ++- tests/integration/test_uart_mock_ld2450.py | 204 ++++++++++++++++ 3 files changed, 448 insertions(+), 5 deletions(-) create mode 100644 tests/integration/fixtures/uart_mock_ld2450.yaml create mode 100644 tests/integration/test_uart_mock_ld2450.py diff --git a/tests/integration/fixtures/uart_mock_ld2450.yaml b/tests/integration/fixtures/uart_mock_ld2450.yaml new file mode 100644 index 0000000000..269136da68 --- /dev/null +++ b/tests/integration/fixtures/uart_mock_ld2450.yaml @@ -0,0 +1,221 @@ +esphome: + name: uart-mock-ld2450-test + +host: +api: +logger: + level: VERBOSE + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + +# Dummy uart entry to satisfy ld2450's DEPENDENCIES = ["uart"] +# The actual UART bus used is the uart_mock component below +uart: + baud_rate: 115200 + port: /dev/null + +uart_mock: + id: mock_uart + baud_rate: 256000 + auto_start: false + responses: + # Catch-all response: match any command footer (04 03 02 01). + # Returns a generic ACK to unblock setup commands. + # + # Response layout: + # [0-3] FD FC FB FA = header + # [4-5] 04 00 = length 4 + # [6] FF = cmd (handled as CMD_ENABLE_CONF) + # [7] 01 = status (ACK) + # [8-9] 00 00 = error = 0 + # [10-13] 04 03 02 01 = footer + - expect_tx: [0x04, 0x03, 0x02, 0x01] + inject_rx: + [ + 0xFD, 0xFC, 0xFB, 0xFA, + 0x04, 0x00, + 0xFF, 0x01, + 0x00, 0x00, + 0x04, 0x03, 0x02, 0x01, + ] + + injections: + # Phase 1 (t=100ms): Valid LD2450 periodic data frame - happy path + # The buffer is clean at this point, so this frame should parse correctly. + # + # Target 1: X=-500mm, Y=1000mm, Speed=-50mm/s (approaching), Res=320mm + # X: magnitude=500 (0x01F4), negative → high=0x01, low=0xF4 + # Y: magnitude=1000 (0x03E8), positive → high=0x83, low=0xE8 + # Speed: raw=5, negative (approaching) → high=0x00, low=0x05, decoded=-50mm/s + # Resolution: 320 → low=0x40, high=0x01 + # Distance: sqrt(500²+1000²) = sqrt(1250000) ≈ 1118mm + # + # Target 2: X=200mm, Y=500mm, Speed=0 (stationary), Res=100mm + # X: magnitude=200 (0x00C8), positive → high=0x80, low=0xC8 + # Y: magnitude=500 (0x01F4), positive → high=0x81, low=0xF4 + # Speed: 0 → 0x00, 0x00 + # Resolution: 100 → low=0x64, high=0x00 + # Distance: sqrt(200²+500²) = sqrt(290000) ≈ 538mm + # + # Target 3: No target (all zeros) + # Distance: 0 → sensors publish unknown/NaN + # + # Counts: target_count=2, moving_target_count=1, still_target_count=1 + # + # Frame layout (30 bytes): + # [0-3] AA FF 03 00 = periodic data header + # [4-11] Target 1 (8 bytes): X_L X_H Y_L Y_H SPD_L SPD_H RES_L RES_H + # [12-19] Target 2 (8 bytes) + # [20-27] Target 3 (8 bytes) + # [28-29] 55 CC = periodic data footer + - delay: 100ms + inject_rx: + [ + 0xAA, 0xFF, 0x03, 0x00, + 0xF4, 0x01, 0xE8, 0x83, 0x05, 0x00, 0x40, 0x01, + 0xC8, 0x80, 0xF4, 0x81, 0x00, 0x00, 0x64, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x55, 0xCC, + ] + + # Phase 2 (t=300ms): Garbage bytes + # LD2450's readline_ does NOT reject bytes at position 0 (unlike LD2412), + # so these bytes accumulate in the buffer. buffer_pos_ goes from 0 to 7. + - delay: 200ms + inject_rx: [0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11, 0x22] + + # Phase 3 (t=400ms): Truncated frame (header + partial data, no footer) + # More bytes accumulating in the buffer without a footer match. + # After this, buffer_pos_ = 7 + 8 = 15. + - delay: 100ms + inject_rx: [0xAA, 0xFF, 0x03, 0x00, 0x01, 0x02, 0x03, 0x04] + + # Phase 4 (t=600ms): Overflow - inject 75 bytes of 0xFF (MAX_LINE_LENGTH=45) + # Buffer has 15 bytes from phases 2+3. + # readline_() stores bytes while buffer_pos_ < 44. When buffer_pos_ == 44, + # the next byte triggers overflow: logs warning, resets buffer_pos_ to 0, + # and discards that byte. + # + # First overflow: 29 bytes fill positions 15-43 (buffer_pos_=44), byte 30 + # triggers overflow (discarded). Total consumed: 30 bytes. + # Second overflow: 44 bytes fill positions 0-43 (buffer_pos_=44), byte 45 + # triggers overflow (discarded). Total consumed: 30+45 = 75 bytes. + # After both overflows, buffer_pos_ = 0 (clean state for recovery frame). + - delay: 200ms + inject_rx: + [ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + ] + + # Phase 5 (t=700ms): Valid frame after overflow - recovery test + # Buffer was reset by overflow. This valid frame should parse correctly. + # + # Target 1: X=300mm, Y=400mm, Speed=30mm/s (moving away), Res=100mm + # X: magnitude=300 (0x012C), positive → high=0x81, low=0x2C + # Y: magnitude=400 (0x0190), positive → high=0x81, low=0x90 + # Speed: raw=3, positive (moving away) → high=0x80, low=0x03, decoded=30mm/s + # Resolution: 100 → low=0x64, high=0x00 + # Distance: sqrt(300²+400²) = 500mm + # + # Target 2 & 3: No target (all zeros) + # Counts: target_count=1, moving_target_count=1, still_target_count=0 + - delay: 100ms + inject_rx: + [ + 0xAA, 0xFF, 0x03, 0x00, + 0x2C, 0x81, 0x90, 0x81, 0x03, 0x80, 0x64, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x55, 0xCC, + ] + +ld2450: + id: ld2450_dev + uart_id: mock_uart + +sensor: + - platform: ld2450 + ld2450_id: ld2450_dev + target_count: + name: "Target Count" + filters: &sensor_filters + - timeout: + timeout: 50ms + value: last + - throttle_with_priority: 50ms + still_target_count: + name: "Still Target Count" + filters: *sensor_filters + moving_target_count: + name: "Moving Target Count" + filters: *sensor_filters + target_1: + x: + name: "Target 1 X" + filters: *sensor_filters + y: + name: "Target 1 Y" + filters: *sensor_filters + speed: + name: "Target 1 Speed" + filters: *sensor_filters + distance: + name: "Target 1 Distance" + filters: *sensor_filters + resolution: + name: "Target 1 Resolution" + filters: *sensor_filters + angle: + name: "Target 1 Angle" + filters: *sensor_filters + target_2: + x: + name: "Target 2 X" + filters: *sensor_filters + y: + name: "Target 2 Y" + filters: *sensor_filters + speed: + name: "Target 2 Speed" + filters: *sensor_filters + distance: + name: "Target 2 Distance" + filters: *sensor_filters + +binary_sensor: + - platform: ld2450 + ld2450_id: ld2450_dev + has_target: + name: "Has Target" + filters: &binary_sensor_filters + - settle: 50ms + has_moving_target: + name: "Has Moving Target" + filters: *binary_sensor_filters + has_still_target: + name: "Has Still Target" + filters: *binary_sensor_filters + +text_sensor: + - platform: ld2450 + ld2450_id: ld2450_dev + target_1: + direction: + name: "Target 1 Direction" + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(mock_uart).start_scenario();' diff --git a/tests/integration/state_utils.py b/tests/integration/state_utils.py index e8c2cc5e66..ab9fdb01bb 100644 --- a/tests/integration/state_utils.py +++ b/tests/integration/state_utils.py @@ -13,6 +13,7 @@ from aioesphomeapi import ( EntityInfo, EntityState, SensorState, + TextSensorState, ) _LOGGER = logging.getLogger(__name__) @@ -244,12 +245,13 @@ class InitialStateHelper: class SensorStateCollector: - """Collects sensor and binary sensor state updates and provides wait helpers. + """Collects sensor, binary sensor, and text sensor state updates with wait helpers. Usage: collector = SensorStateCollector( sensor_names=["moving_distance", "still_distance"], binary_sensor_names=["has_target"], + text_sensor_names=["direction"], ) # Use collector.on_state as the callback (or wrap it) client.subscribe_states(helper.on_state_wrapper(collector.on_state)) @@ -259,18 +261,23 @@ class SensorStateCollector: # Access collected states assert collector.sensor_states["moving_distance"][0] == approx(100.0) + assert collector.text_sensor_states["direction"][0] == "Approaching" """ def __init__( self, sensor_names: list[str], binary_sensor_names: list[str] | None = None, + text_sensor_names: list[str] | None = None, entities: list[EntityInfo] | None = None, ) -> None: self.sensor_states: dict[str, list[float]] = {name: [] for name in sensor_names} self.binary_states: dict[str, list[bool]] = { name: [] for name in (binary_sensor_names or []) } + self.text_sensor_states: dict[str, list[str]] = { + name: [] for name in (text_sensor_names or []) + } self._key_to_sensor: dict[int, str] = {} self._waiters: list[tuple[Callable[[], bool], asyncio.Future[bool]]] = [] @@ -279,7 +286,11 @@ class SensorStateCollector: def build_key_mapping(self, entities: list[EntityInfo]) -> None: """Build key-to-name mapping from entities. Sorted by descending length.""" - all_names = list(self.sensor_states.keys()) + list(self.binary_states.keys()) + all_names = ( + list(self.sensor_states.keys()) + + list(self.binary_states.keys()) + + list(self.text_sensor_states.keys()) + ) all_names.sort(key=len, reverse=True) self._key_to_sensor = build_key_to_entity_mapping(entities, all_names) @@ -295,6 +306,11 @@ class SensorStateCollector: if sensor_name and sensor_name in self.binary_states: self.binary_states[sensor_name].append(state.state) self._check_waiters() + elif isinstance(state, TextSensorState) and not state.missing_state: + sensor_name = self._key_to_sensor.get(state.key) + if sensor_name and sensor_name in self.text_sensor_states: + self.text_sensor_states[sensor_name].append(state.state) + self._check_waiters() def _check_waiters(self) -> None: """Check all pending waiters and resolve any whose condition is met.""" @@ -303,9 +319,11 @@ class SensorStateCollector: future.set_result(True) def _all_have_values(self) -> bool: - """Check if all sensor and binary sensor lists have at least one value.""" - return all(len(v) >= 1 for v in self.sensor_states.values()) and all( - len(v) >= 1 for v in self.binary_states.values() + """Check if all sensor, binary sensor, and text sensor lists have at least one value.""" + return ( + all(len(v) >= 1 for v in self.sensor_states.values()) + and all(len(v) >= 1 for v in self.binary_states.values()) + and all(len(v) >= 1 for v in self.text_sensor_states.values()) ) async def wait_for_all(self, timeout: float = 3.0) -> None: diff --git a/tests/integration/test_uart_mock_ld2450.py b/tests/integration/test_uart_mock_ld2450.py new file mode 100644 index 0000000000..b1aa2f6952 --- /dev/null +++ b/tests/integration/test_uart_mock_ld2450.py @@ -0,0 +1,204 @@ +"""Integration test for LD2450 component with mock UART. + +Tests: +test_uart_mock_ld2450: + 1. Happy path - valid periodic data frame publishes correct target sensor values + 2. Multi-target tracking - verifies target count, moving/still counts + 3. Target coordinate decoding - signed X/Y coordinates with sign-magnitude encoding + 4. Speed decoding - approaching (negative) and stationary (zero) targets + 5. Distance calculation - computed from X/Y via sqrt(x²+y²) + 6. Direction text sensor - "Approaching" for negative speed target + 7. Garbage resilience - random bytes don't crash the component + 8. Truncated frame handling - partial frame doesn't corrupt state + 9. Buffer overflow recovery - overflow resets the parser + 10. Post-overflow parsing - next valid frame after overflow is parsed correctly + 11. TX logging - verifies LD2450 sends expected setup commands +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from aioesphomeapi import ButtonInfo +import pytest + +from .state_utils import InitialStateHelper, SensorStateCollector, find_entity +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_uart_mock_ld2450( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test LD2450 data parsing with happy path, garbage, overflow, and recovery.""" + # Replace external component path placeholder + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + loop = asyncio.get_running_loop() + + # Track overflow warning in logs + overflow_seen = loop.create_future() + + # Track TX data logged by the mock for assertions + tx_log_lines: list[str] = [] + + def line_callback(line: str) -> None: + if "Max command length exceeded" in line and not overflow_seen.done(): + overflow_seen.set_result(True) + # Capture all TX log lines from uart_mock + if "uart_mock" in line and "TX " in line: + tx_log_lines.append(line) + + collector = SensorStateCollector( + sensor_names=[ + "target_1_x", + "target_1_y", + "target_1_speed", + "target_1_distance", + "target_1_resolution", + "target_1_angle", + "target_2_x", + "target_2_y", + "target_2_speed", + "target_2_distance", + "target_count", + "still_target_count", + "moving_target_count", + ], + binary_sensor_names=[ + "has_target", + "has_moving_target", + "has_still_target", + ], + text_sensor_names=[ + "target_1_direction", + ], + ) + + # Signal when we see recovery frame values (target 1 distance ≈ 500mm) + recovery_received = collector.add_waiter( + lambda: ( + pytest.approx(500.0, abs=1.0) + in collector.sensor_states["target_1_distance"] + ) + ) + + async with ( + run_compiled(yaml_config, line_callback=line_callback), + api_client_connected() as client, + ): + entities, _ = await client.list_entities_services() + collector.build_key_mapping(entities) + + # Set up initial state helper + initial_state_helper = InitialStateHelper(entities) + client.subscribe_states( + initial_state_helper.on_state_wrapper(collector.on_state) + ) + + try: + await initial_state_helper.wait_for_initial_states() + except TimeoutError: + pytest.fail("Timeout waiting for initial states") + + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) + + # Wait for Phase 1 - all sensors and binary sensors have at least one value + try: + await collector.wait_for_all(timeout=5.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for Phase 1 frame. Received:\n" + f" sensor_states: {collector.sensor_states}\n" + f" binary_states: {collector.binary_states}\n" + f" text_states: {collector.text_sensor_states}" + ) + + # Phase 1 values: + # Target 1: X=-500, Y=1000, Speed=-50 (approaching), Res=320 + # Distance = sqrt(500²+1000²) ≈ 1118mm + assert collector.sensor_states["target_1_x"][0] == pytest.approx(-500.0) + assert collector.sensor_states["target_1_y"][0] == pytest.approx(1000.0) + assert collector.sensor_states["target_1_speed"][0] == pytest.approx(-50.0) + assert collector.sensor_states["target_1_resolution"][0] == pytest.approx(320.0) + # Distance computed from X/Y + assert collector.sensor_states["target_1_distance"][0] == pytest.approx( + 1118.0, abs=1.0 + ) + + # Target 2: X=200, Y=500, Speed=0 (stationary), Res=100 + # Distance = sqrt(200²+500²) ≈ 538mm + assert collector.sensor_states["target_2_x"][0] == pytest.approx(200.0) + assert collector.sensor_states["target_2_y"][0] == pytest.approx(500.0) + assert collector.sensor_states["target_2_speed"][0] == pytest.approx(0.0) + assert collector.sensor_states["target_2_distance"][0] == pytest.approx( + 538.0, abs=1.0 + ) + + # Target counts: 2 targets total, 1 moving, 1 still + assert collector.sensor_states["target_count"][0] == pytest.approx(2.0) + assert collector.sensor_states["moving_target_count"][0] == pytest.approx(1.0) + assert collector.sensor_states["still_target_count"][0] == pytest.approx(1.0) + + # Binary sensors: all true (targets detected) + assert collector.binary_states["has_target"][0] is True + assert collector.binary_states["has_moving_target"][0] is True + assert collector.binary_states["has_still_target"][0] is True + + # Direction text sensor: Target 1 is approaching (speed < 0) + assert collector.text_sensor_states["target_1_direction"][0] == "Approaching" + + # Wait for the recovery frame (Phase 5) to be parsed + # This proves the component survived garbage + truncated + overflow + try: + await asyncio.wait_for(recovery_received, timeout=5.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for recovery frame. Received:\n" + f" sensor_states: {collector.sensor_states}" + ) + + # Verify overflow warning was logged + assert overflow_seen.done(), ( + "Expected 'Max command length exceeded' warning in logs" + ) + + # Verify LD2450 sent setup commands (TX logging) + assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock" + tx_data = " ".join(tx_log_lines) + # Verify command frame header appears (FD:FC:FB:FA) + assert "FD:FC:FB:FA" in tx_data, ( + "Expected LD2450 command frame header FD:FC:FB:FA in TX log" + ) + # Verify command frame footer appears (04:03:02:01) + assert "04:03:02:01" in tx_data, ( + "Expected LD2450 command frame footer 04:03:02:01 in TX log" + ) + + # Recovery frame values (Phase 5, after overflow): + # Target 1: X=300, Y=400, Distance=500, Speed=30 (moving away) + # target_count=1, moving=1, still=0 + # + # Note: throttle filters cause sensor lists to have different lengths, + # so we check each value appeared somewhere rather than using a shared index. + assert ( + pytest.approx(500.0, abs=1.0) + in collector.sensor_states["target_1_distance"] + ) + assert pytest.approx(300.0) in collector.sensor_states["target_1_x"] + assert pytest.approx(400.0) in collector.sensor_states["target_1_y"] + assert pytest.approx(30.0) in collector.sensor_states["target_1_speed"] + assert pytest.approx(1.0) in collector.sensor_states["target_count"] + assert pytest.approx(1.0) in collector.sensor_states["moving_target_count"] + assert pytest.approx(0.0) in collector.sensor_states["still_target_count"]