diff --git a/tests/integration/fixtures/external_components/uart_mock/__init__.py b/tests/integration/fixtures/external_components/uart_mock/__init__.py index 8deab4c21e..abb3abcc41 100644 --- a/tests/integration/fixtures/external_components/uart_mock/__init__.py +++ b/tests/integration/fixtures/external_components/uart_mock/__init__.py @@ -43,6 +43,7 @@ CONF_INJECT_RX = "inject_rx" CONF_EXPECT_TX = "expect_tx" CONF_PERIODIC_RX = "periodic_rx" CONF_ON_TX = "on_tx" +CONF_AUTO_START = "auto_start" UART_PARITY_OPTIONS = { "NONE": uart.UARTParityOptions.UART_CONFIG_PARITY_NONE, @@ -95,6 +96,7 @@ CONFIG_SCHEMA = cv.Schema( cv.Optional(CONF_INJECTIONS, default=[]): cv.ensure_list(INJECTION_SCHEMA), cv.Optional(CONF_RESPONSES, default=[]): cv.ensure_list(RESPONSE_SCHEMA), cv.Optional(CONF_PERIODIC_RX, default=[]): cv.ensure_list(PERIODIC_RX_SCHEMA), + cv.Optional(CONF_AUTO_START, default=True): cv.boolean, cv.Optional(CONF_ON_TX): automation.validate_automation( { cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(MockUartTXTrigger), @@ -138,6 +140,9 @@ async def to_code(config): cg.add(var.set_data_bits(config[CONF_DATA_BITS])) cg.add(var.set_parity(config[CONF_PARITY])) + if not config[CONF_AUTO_START]: + cg.add(var.set_auto_start(False)) + for injection in config[CONF_INJECTIONS]: rx_data = injection[CONF_INJECT_RX] delay_ms = injection[CONF_DELAY] diff --git a/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp b/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp index a4a1c41234..83a13793be 100644 --- a/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp +++ b/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp @@ -16,17 +16,21 @@ void MockUartComponent::setup() { } void MockUartComponent::loop() { - uint32_t now = App.get_loop_component_start_time(); - - // Initialize scenario start time on first loop() call, after all components have - // finished setup(). This prevents injection delays from being consumed during setup. if (!this->loop_started_) { this->loop_started_ = true; - this->scenario_start_ms_ = now; - this->cumulative_delay_ms_ = 0; - ESP_LOGD(TAG, "Scenario started at %u ms", now); + if (this->auto_start_) { + this->start_scenario(); + } else { + ESP_LOGD(TAG, "Scenario waiting for manual start"); + } } + if (!this->scenario_active_) { + return; + } + + uint32_t now = App.get_loop_component_start_time(); + // Process at most ONE timed injection per loop iteration. // This ensures each injection is in a separate loop cycle, giving the consuming // component (e.g., LD2410) a chance to process each batch independently. @@ -50,6 +54,19 @@ void MockUartComponent::loop() { } } +void MockUartComponent::start_scenario() { + uint32_t now = App.get_loop_component_start_time(); + this->scenario_active_ = true; + this->scenario_start_ms_ = now; + this->cumulative_delay_ms_ = 0; + this->injection_index_ = 0; + this->tx_buffer_.clear(); + for (auto &periodic : this->periodic_rx_) { + periodic.last_inject_ms = now; + } + ESP_LOGD(TAG, "Scenario started at %u ms", now); +} + void MockUartComponent::dump_config() { ESP_LOGCONFIG(TAG, "Mock UART Component:\n" @@ -78,10 +95,12 @@ void MockUartComponent::write_array(const uint8_t *data, size_t len) { } #endif - this->try_match_response_(); + if (this->scenario_active_) { + this->try_match_response_(); + } // This directly calls a tx_hook (lambda) as an alternative to the simpler match_response mechanism. - if (this->tx_hook_) { + if (this->tx_hook_ && this->scenario_active_) { std::vector buf(data, data + len); this->tx_hook_(buf); } diff --git a/tests/integration/fixtures/external_components/uart_mock/uart_mock.h b/tests/integration/fixtures/external_components/uart_mock/uart_mock.h index 5bbc3c1bf6..b721512f96 100644 --- a/tests/integration/fixtures/external_components/uart_mock/uart_mock.h +++ b/tests/integration/fixtures/external_components/uart_mock/uart_mock.h @@ -37,6 +37,8 @@ class MockUartComponent : public uart::UARTComponent, public Component { void add_response(const std::vector &expect_tx, const std::vector &inject_rx); void add_periodic_rx(const std::vector &data, uint32_t interval_ms); + void start_scenario(); + void set_auto_start(bool auto_start) { this->auto_start_ = auto_start; } void set_tx_hook(std::function &)> &&cb) { this->tx_hook_ = std::move(cb); } void inject_to_rx_buffer(const std::vector &data); void inject_to_rx_buffer(const uint8_t *data, size_t len); @@ -55,6 +57,8 @@ class MockUartComponent : public uart::UARTComponent, public Component { uint32_t scenario_start_ms_{0}; uint32_t cumulative_delay_ms_{0}; bool loop_started_{false}; + bool auto_start_{true}; + bool scenario_active_{false}; // TX-triggered responses struct Response { diff --git a/tests/integration/fixtures/uart_mock_ld2410.yaml b/tests/integration/fixtures/uart_mock_ld2410.yaml index 9a81468263..59838b0599 100644 --- a/tests/integration/fixtures/uart_mock_ld2410.yaml +++ b/tests/integration/fixtures/uart_mock_ld2410.yaml @@ -20,6 +20,7 @@ uart: uart_mock: id: mock_uart baud_rate: 256000 + auto_start: false injections: # Phase 1 (t=100ms): Valid LD2410 normal mode data frame - happy path # The buffer is clean at this point, so this frame should parse correctly. @@ -143,3 +144,10 @@ binary_sensor: name: "Has Moving Target" has_still_target: name: "Has Still Target" + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(mock_uart).start_scenario();' diff --git a/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml b/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml index 3b730fc1f8..4625ae8511 100644 --- a/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml +++ b/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml @@ -19,6 +19,7 @@ uart: uart_mock: id: mock_uart baud_rate: 256000 + auto_start: false injections: # Phase 1 (t=100ms): Valid LD2410 engineering mode data frame # Captured from a real Screek Human Presence Sensor 1U with LD2410 firmware 2.4.x @@ -154,3 +155,10 @@ binary_sensor: name: "Has Still Target" out_pin_presence_status: name: "Out Pin Presence" + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(mock_uart).start_scenario();' diff --git a/tests/integration/fixtures/uart_mock_ld2412.yaml b/tests/integration/fixtures/uart_mock_ld2412.yaml index a502f36a25..9cf9d6bb87 100644 --- a/tests/integration/fixtures/uart_mock_ld2412.yaml +++ b/tests/integration/fixtures/uart_mock_ld2412.yaml @@ -20,6 +20,7 @@ uart: uart_mock: id: mock_uart baud_rate: 256000 + auto_start: false injections: # Phase 1 (t=100ms): Valid LD2412 normal mode data frame - happy path # The buffer is clean at this point, so this frame should parse correctly. @@ -169,3 +170,10 @@ binary_sensor: name: "Has Still Target" filters: - settle: 50ms + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(mock_uart).start_scenario();' diff --git a/tests/integration/fixtures/uart_mock_ld2412_engineering.yaml b/tests/integration/fixtures/uart_mock_ld2412_engineering.yaml index 3c669fc9a9..103dbed132 100644 --- a/tests/integration/fixtures/uart_mock_ld2412_engineering.yaml +++ b/tests/integration/fixtures/uart_mock_ld2412_engineering.yaml @@ -19,6 +19,7 @@ uart: uart_mock: id: mock_uart baud_rate: 256000 + auto_start: false injections: # Phase 1 (t=100ms): Valid LD2412 engineering mode data frame # @@ -211,3 +212,10 @@ binary_sensor: name: "Has Still Target" filters: - settle: 50ms + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(mock_uart).start_scenario();' diff --git a/tests/integration/fixtures/uart_mock_modbus.yaml b/tests/integration/fixtures/uart_mock_modbus.yaml index 89b9b91861..0a3492a0d2 100644 --- a/tests/integration/fixtures/uart_mock_modbus.yaml +++ b/tests/integration/fixtures/uart_mock_modbus.yaml @@ -22,6 +22,7 @@ uart_mock: baud_rate: 9600 rx_full_threshold: 120 rx_timeout: 2 + auto_start: false debug: responses: - expect_tx: [0x01, 0x03, 0x00, 0x03, 0x00, 0x01, 0x74, 0x0A] # Read holding register 1 on device 1 @@ -38,3 +39,10 @@ sensor: name: "basic_register" address: 0x03 register_type: holding + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(virtual_uart_dev).start_scenario();' diff --git a/tests/integration/fixtures/uart_mock_modbus_timing.yaml b/tests/integration/fixtures/uart_mock_modbus_timing.yaml index cd485ca394..c4e29e5fe8 100644 --- a/tests/integration/fixtures/uart_mock_modbus_timing.yaml +++ b/tests/integration/fixtures/uart_mock_modbus_timing.yaml @@ -22,6 +22,7 @@ uart_mock: baud_rate: 9600 rx_full_threshold: 120 rx_timeout: 2 + auto_start: false debug: on_tx: - then: @@ -52,3 +53,10 @@ sensor: phase_a: voltage: name: sdm_voltage + +button: + - platform: template + name: "Start Scenario" + id: start_scenario_btn + on_press: + - lambda: 'id(virtual_uart_dev).start_scenario();' diff --git a/tests/integration/state_utils.py b/tests/integration/state_utils.py index b649056f2b..e8c2cc5e66 100644 --- a/tests/integration/state_utils.py +++ b/tests/integration/state_utils.py @@ -3,10 +3,17 @@ from __future__ import annotations import asyncio +from collections.abc import Callable import logging from typing import TypeVar -from aioesphomeapi import ButtonInfo, EntityInfo, EntityState +from aioesphomeapi import ( + BinarySensorState, + ButtonInfo, + EntityInfo, + EntityState, + SensorState, +) _LOGGER = logging.getLogger(__name__) @@ -234,3 +241,90 @@ class InitialStateHelper: asyncio.TimeoutError: If initial states aren't received within timeout """ await asyncio.wait_for(self._initial_states_received, timeout=timeout) + + +class SensorStateCollector: + """Collects sensor and binary sensor state updates and provides wait helpers. + + Usage: + collector = SensorStateCollector( + sensor_names=["moving_distance", "still_distance"], + binary_sensor_names=["has_target"], + ) + # Use collector.on_state as the callback (or wrap it) + client.subscribe_states(helper.on_state_wrapper(collector.on_state)) + + # Wait for all sensors to have at least one value + await collector.wait_for_all(timeout=3.0) + + # Access collected states + assert collector.sensor_states["moving_distance"][0] == approx(100.0) + """ + + def __init__( + self, + sensor_names: list[str], + binary_sensor_names: list[str] | None = None, + entities: list[EntityInfo] | None = None, + ) -> None: + self.sensor_states: dict[str, list[float]] = {name: [] for name in sensor_names} + self.binary_states: dict[str, list[bool]] = { + name: [] for name in (binary_sensor_names or []) + } + self._key_to_sensor: dict[int, str] = {} + self._waiters: list[tuple[Callable[[], bool], asyncio.Future[bool]]] = [] + + if entities is not None: + self.build_key_mapping(entities) + + def build_key_mapping(self, entities: list[EntityInfo]) -> None: + """Build key-to-name mapping from entities. Sorted by descending length.""" + all_names = list(self.sensor_states.keys()) + list(self.binary_states.keys()) + all_names.sort(key=len, reverse=True) + self._key_to_sensor = build_key_to_entity_mapping(entities, all_names) + + def on_state(self, state: EntityState) -> None: + """Process a state update.""" + if isinstance(state, SensorState) and not state.missing_state: + sensor_name = self._key_to_sensor.get(state.key) + if sensor_name and sensor_name in self.sensor_states: + self.sensor_states[sensor_name].append(state.state) + self._check_waiters() + elif isinstance(state, BinarySensorState): + sensor_name = self._key_to_sensor.get(state.key) + if sensor_name and sensor_name in self.binary_states: + self.binary_states[sensor_name].append(state.state) + self._check_waiters() + + def _check_waiters(self) -> None: + """Check all pending waiters and resolve any whose condition is met.""" + for condition, future in self._waiters: + if not future.done() and condition(): + future.set_result(True) + + def _all_have_values(self) -> bool: + """Check if all sensor and binary sensor lists have at least one value.""" + return all(len(v) >= 1 for v in self.sensor_states.values()) and all( + len(v) >= 1 for v in self.binary_states.values() + ) + + async def wait_for_all(self, timeout: float = 3.0) -> None: + """Wait until all sensors and binary sensors have at least one value.""" + if self._all_have_values(): + return + future: asyncio.Future[bool] = asyncio.get_running_loop().create_future() + self._waiters.append((self._all_have_values, future)) + await asyncio.wait_for(future, timeout=timeout) + + def add_waiter(self, condition: Callable[[], bool]) -> asyncio.Future[bool]: + """Add a custom waiter that resolves when condition returns True. + + Returns: + A future that resolves when the condition is met. + """ + future: asyncio.Future[bool] = asyncio.get_running_loop().create_future() + if condition(): + future.set_result(True) + else: + self._waiters.append((condition, future)) + return future diff --git a/tests/integration/test_uart_mock_ld2410.py b/tests/integration/test_uart_mock_ld2410.py index e01d6ff8e8..ce0e1bb7ec 100644 --- a/tests/integration/test_uart_mock_ld2410.py +++ b/tests/integration/test_uart_mock_ld2410.py @@ -21,16 +21,10 @@ from __future__ import annotations import asyncio from pathlib import Path -from aioesphomeapi import ( - BinarySensorInfo, - BinarySensorState, - EntityState, - SensorInfo, - SensorState, -) +from aioesphomeapi import ButtonInfo import pytest -from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity +from .state_utils import InitialStateHelper, SensorStateCollector, find_entity from .types import APIClientConnectedFactory, RunCompiledFunction @@ -64,100 +58,65 @@ async def test_uart_mock_ld2410( if "uart_mock" in line and "TX " in line: tx_log_lines.append(line) - # Track sensor state updates (after initial state is swallowed) - sensor_states: dict[str, list[float]] = { - "moving_distance": [], - "still_distance": [], - "moving_energy": [], - "still_energy": [], - "detection_distance": [], - } - binary_states: dict[str, list[bool]] = { - "has_target": [], - "has_moving_target": [], - "has_still_target": [], - } + collector = SensorStateCollector( + sensor_names=[ + "moving_distance", + "still_distance", + "moving_energy", + "still_energy", + "detection_distance", + ], + binary_sensor_names=[ + "has_target", + "has_moving_target", + "has_still_target", + ], + ) # Signal when we see recovery frame values - recovery_received = loop.create_future() - - def on_state(state: EntityState) -> None: - if isinstance(state, SensorState) and not state.missing_state: - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in sensor_states: - sensor_states[sensor_name].append(state.state) - # Check if this is the recovery frame (moving_distance = 50) - if ( - sensor_name == "moving_distance" - and state.state == pytest.approx(50.0) - and not recovery_received.done() - ): - recovery_received.set_result(True) - elif isinstance(state, BinarySensorState): - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in binary_states: - binary_states[sensor_name].append(state.state) + recovery_received = collector.add_waiter( + lambda: pytest.approx(50.0) in collector.sensor_states["moving_distance"] + ) async with ( run_compiled(yaml_config, line_callback=line_callback), api_client_connected() as client, ): entities, _ = await client.list_entities_services() - - # Build key mappings for all sensor types - all_names = list(sensor_states.keys()) + list(binary_states.keys()) - key_to_sensor = build_key_to_entity_mapping(entities, all_names) + collector.build_key_mapping(entities) # Set up initial state helper initial_state_helper = InitialStateHelper(entities) - client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) + client.subscribe_states( + initial_state_helper.on_state_wrapper(collector.on_state) + ) try: await initial_state_helper.wait_for_initial_states() except TimeoutError: pytest.fail("Timeout waiting for initial states") - # Phase 1 values are in the initial states (swallowed by InitialStateHelper). - # Verify them via initial_states dict. - moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo) - assert moving_dist_entity is not None - initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key) - assert initial_moving is not None and isinstance(initial_moving, SensorState) - assert initial_moving.state == pytest.approx(100.0), ( - f"Initial moving distance should be 100, got {initial_moving.state}" - ) + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) - still_dist_entity = find_entity(entities, "still_distance", SensorInfo) - assert still_dist_entity is not None - initial_still = initial_state_helper.initial_states.get(still_dist_entity.key) - assert initial_still is not None and isinstance(initial_still, SensorState) - assert initial_still.state == pytest.approx(120.0), ( - f"Initial still distance should be 120, got {initial_still.state}" - ) + # Wait for Phase 1 - all sensors and binary sensors have at least one value + try: + await collector.wait_for_all(timeout=3.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for Phase 1 frame. Received:\n" + f" sensor_states: {collector.sensor_states}\n" + f" binary_states: {collector.binary_states}" + ) - moving_energy_entity = find_entity(entities, "moving_energy", SensorInfo) - assert moving_energy_entity is not None - initial_me = initial_state_helper.initial_states.get(moving_energy_entity.key) - assert initial_me is not None and isinstance(initial_me, SensorState) - assert initial_me.state == pytest.approx(50.0), ( - f"Initial moving energy should be 50, got {initial_me.state}" - ) - - still_energy_entity = find_entity(entities, "still_energy", SensorInfo) - assert still_energy_entity is not None - initial_se = initial_state_helper.initial_states.get(still_energy_entity.key) - assert initial_se is not None and isinstance(initial_se, SensorState) - assert initial_se.state == pytest.approx(25.0), ( - f"Initial still energy should be 25, got {initial_se.state}" - ) - - detect_dist_entity = find_entity(entities, "detection_distance", SensorInfo) - assert detect_dist_entity is not None - initial_dd = initial_state_helper.initial_states.get(detect_dist_entity.key) - assert initial_dd is not None and isinstance(initial_dd, SensorState) - assert initial_dd.state == pytest.approx(300.0), ( - f"Initial detection distance should be 300, got {initial_dd.state}" - ) + # Phase 1 values: moving=100, still=120, energy=50/25, detect=300 + assert collector.sensor_states["moving_distance"][0] == pytest.approx(100.0) + assert collector.sensor_states["still_distance"][0] == pytest.approx(120.0) + assert collector.sensor_states["moving_energy"][0] == pytest.approx(50.0) + assert collector.sensor_states["still_energy"][0] == pytest.approx(25.0) + assert collector.sensor_states["detection_distance"][0] == pytest.approx(300.0) # Wait for the recovery frame (Phase 5) to be parsed # This proves the component survived garbage + truncated + overflow @@ -165,12 +124,8 @@ async def test_uart_mock_ld2410( await asyncio.wait_for(recovery_received, timeout=15.0) except TimeoutError: pytest.fail( - f"Timeout waiting for recovery frame. Received sensor states:\n" - f" moving_distance: {sensor_states['moving_distance']}\n" - f" still_distance: {sensor_states['still_distance']}\n" - f" moving_energy: {sensor_states['moving_energy']}\n" - f" still_energy: {sensor_states['still_energy']}\n" - f" detection_distance: {sensor_states['detection_distance']}" + f"Timeout waiting for recovery frame. Received:\n" + f" sensor_states: {collector.sensor_states}" ) # Verify overflow warning was logged @@ -183,67 +138,36 @@ async def test_uart_mock_ld2410( # A5 (MAC), AB (distance res), AE (light), 61 (params), FE (config off) assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock" tx_data = " ".join(tx_log_lines) - # Verify command frame header appears (FD:FC:FB:FA) assert "FD:FC:FB:FA" in tx_data, ( "Expected LD2410 command frame header FD:FC:FB:FA in TX log" ) - # Verify command frame footer appears (04:03:02:01) assert "04:03:02:01" in tx_data, ( "Expected LD2410 command frame footer 04:03:02:01 in TX log" ) - # Recovery frame values (Phase 5, after overflow) - assert len(sensor_states["moving_distance"]) >= 1, ( - f"Expected recovery moving_distance, got: {sensor_states['moving_distance']}" - ) - # Find the recovery value (moving_distance = 50) - recovery_values = [ - v for v in sensor_states["moving_distance"] if v == pytest.approx(50.0) - ] - assert len(recovery_values) >= 1, ( - f"Expected moving_distance=50 in recovery, got: {sensor_states['moving_distance']}" - ) - # Recovery frame: moving=50, still=75, energy=100/80, detect=127 recovery_idx = next( i - for i, v in enumerate(sensor_states["moving_distance"]) + for i, v in enumerate(collector.sensor_states["moving_distance"]) if v == pytest.approx(50.0) ) - assert sensor_states["still_distance"][recovery_idx] == pytest.approx(75.0), ( - f"Recovery still distance should be 75, got {sensor_states['still_distance'][recovery_idx]}" + assert collector.sensor_states["still_distance"][recovery_idx] == pytest.approx( + 75.0 ) - assert sensor_states["moving_energy"][recovery_idx] == pytest.approx(100.0), ( - f"Recovery moving energy should be 100, got {sensor_states['moving_energy'][recovery_idx]}" + assert collector.sensor_states["moving_energy"][recovery_idx] == pytest.approx( + 100.0 ) - assert sensor_states["still_energy"][recovery_idx] == pytest.approx(80.0), ( - f"Recovery still energy should be 80, got {sensor_states['still_energy'][recovery_idx]}" - ) - assert sensor_states["detection_distance"][recovery_idx] == pytest.approx( - 127.0 - ), ( - f"Recovery detection distance should be 127, got {sensor_states['detection_distance'][recovery_idx]}" + assert collector.sensor_states["still_energy"][recovery_idx] == pytest.approx( + 80.0 ) + assert collector.sensor_states["detection_distance"][ + recovery_idx + ] == pytest.approx(127.0) - # Verify binary sensors detected targets - # Binary sensors could be in initial states or forwarded states - has_target_entity = find_entity(entities, "has_target", BinarySensorInfo) - assert has_target_entity is not None - initial_ht = initial_state_helper.initial_states.get(has_target_entity.key) - assert initial_ht is not None and isinstance(initial_ht, BinarySensorState) - assert initial_ht.state is True, "Has target should be True" - - has_moving_entity = find_entity(entities, "has_moving_target", BinarySensorInfo) - assert has_moving_entity is not None - initial_hm = initial_state_helper.initial_states.get(has_moving_entity.key) - assert initial_hm is not None and isinstance(initial_hm, BinarySensorState) - assert initial_hm.state is True, "Has moving target should be True" - - has_still_entity = find_entity(entities, "has_still_target", BinarySensorInfo) - assert has_still_entity is not None - initial_hs = initial_state_helper.initial_states.get(has_still_entity.key) - assert initial_hs is not None and isinstance(initial_hs, BinarySensorState) - assert initial_hs.state is True, "Has still target should be True" + # Verify binary sensors detected targets (from Phase 1 frame) + assert collector.binary_states["has_target"][0] is True + assert collector.binary_states["has_moving_target"][0] is True + assert collector.binary_states["has_still_target"][0] is True @pytest.mark.asyncio @@ -260,133 +184,82 @@ async def test_uart_mock_ld2410_engineering( "EXTERNAL_COMPONENT_PATH", external_components_path ) - loop = asyncio.get_running_loop() + collector = SensorStateCollector( + sensor_names=[ + "moving_distance", + "still_distance", + "moving_energy", + "still_energy", + "detection_distance", + "light", + "gate_0_move_energy", + "gate_1_move_energy", + "gate_2_move_energy", + "gate_0_still_energy", + "gate_1_still_energy", + "gate_2_still_energy", + ], + binary_sensor_names=[ + "has_target", + "has_moving_target", + "has_still_target", + "out_pin_presence", + ], + ) - # Track sensor state updates (after initial state is swallowed) - sensor_states: dict[str, list[float]] = { - "moving_distance": [], - "still_distance": [], - "moving_energy": [], - "still_energy": [], - "detection_distance": [], - "light": [], - "gate_0_move_energy": [], - "gate_1_move_energy": [], - "gate_2_move_energy": [], - "gate_0_still_energy": [], - "gate_1_still_energy": [], - "gate_2_still_energy": [], - } - binary_states: dict[str, list[bool]] = { - "has_target": [], - "has_moving_target": [], - "has_still_target": [], - "out_pin_presence": [], - } - - # Signal when we see Phase 3 frame (still_distance = 291) - phase3_received = loop.create_future() - - def on_state(state: EntityState) -> None: - if isinstance(state, SensorState) and not state.missing_state: - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in sensor_states: - sensor_states[sensor_name].append(state.state) - if ( - sensor_name == "still_distance" - and state.state == pytest.approx(291.0) - and not phase3_received.done() - ): - phase3_received.set_result(True) - elif isinstance(state, BinarySensorState): - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in binary_states: - binary_states[sensor_name].append(state.state) + # Signal when we see Phase 3 frame values + phase3_received = collector.add_waiter( + lambda: pytest.approx(291.0) in collector.sensor_states["still_distance"] + ) async with ( run_compiled(yaml_config), api_client_connected() as client, ): entities, _ = await client.list_entities_services() - - all_names = list(sensor_states.keys()) + list(binary_states.keys()) - key_to_sensor = build_key_to_entity_mapping(entities, all_names) + collector.build_key_mapping(entities) initial_state_helper = InitialStateHelper(entities) - client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) + client.subscribe_states( + initial_state_helper.on_state_wrapper(collector.on_state) + ) try: await initial_state_helper.wait_for_initial_states() except TimeoutError: pytest.fail("Timeout waiting for initial states") - # Phase 1 initial values (engineering mode frame): + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) + + # Wait for Phase 1 - all sensors and binary sensors have at least one value + try: + await collector.wait_for_all(timeout=3.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for Phase 1 frame. Received:\n" + f" sensor_states: {collector.sensor_states}\n" + f" binary_states: {collector.binary_states}" + ) + + # Phase 1 values (engineering mode frame): # moving=30, energy=100, still=30, energy=100, detect=0 - moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo) - assert moving_dist_entity is not None - initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key) - assert initial_moving is not None and isinstance(initial_moving, SensorState) - assert initial_moving.state == pytest.approx(30.0), ( - f"Initial moving distance should be 30, got {initial_moving.state}" - ) - - still_dist_entity = find_entity(entities, "still_distance", SensorInfo) - assert still_dist_entity is not None - initial_still = initial_state_helper.initial_states.get(still_dist_entity.key) - assert initial_still is not None and isinstance(initial_still, SensorState) - assert initial_still.state == pytest.approx(30.0), ( - f"Initial still distance should be 30, got {initial_still.state}" - ) - - # Verify engineering mode sensors from initial state - # Gate 0 moving energy = 0x64 = 100 - gate0_move_entity = find_entity(entities, "gate_0_move_energy", SensorInfo) - assert gate0_move_entity is not None - initial_g0m = initial_state_helper.initial_states.get(gate0_move_entity.key) - assert initial_g0m is not None and isinstance(initial_g0m, SensorState) - assert initial_g0m.state == pytest.approx(100.0), ( - f"Gate 0 move energy should be 100, got {initial_g0m.state}" - ) - - # Gate 1 moving energy = 0x41 = 65 - gate1_move_entity = find_entity(entities, "gate_1_move_energy", SensorInfo) - assert gate1_move_entity is not None - initial_g1m = initial_state_helper.initial_states.get(gate1_move_entity.key) - assert initial_g1m is not None and isinstance(initial_g1m, SensorState) - assert initial_g1m.state == pytest.approx(65.0), ( - f"Gate 1 move energy should be 65, got {initial_g1m.state}" - ) - - # Light sensor = 0x57 = 87 - light_entity = find_entity(entities, "light", SensorInfo) - assert light_entity is not None - initial_light = initial_state_helper.initial_states.get(light_entity.key) - assert initial_light is not None and isinstance(initial_light, SensorState) - assert initial_light.state == pytest.approx(87.0), ( - f"Light sensor should be 87, got {initial_light.state}" - ) - - # Out pin presence = 0x01 = True - out_pin_entity = find_entity(entities, "out_pin_presence", BinarySensorInfo) - assert out_pin_entity is not None - initial_out = initial_state_helper.initial_states.get(out_pin_entity.key) - assert initial_out is not None and isinstance(initial_out, BinarySensorState) - assert initial_out.state is True, "Out pin presence should be True" + assert collector.sensor_states["moving_distance"][0] == pytest.approx(30.0) + assert collector.sensor_states["still_distance"][0] == pytest.approx(30.0) + assert collector.sensor_states["gate_0_move_energy"][0] == pytest.approx(100.0) + assert collector.sensor_states["gate_1_move_energy"][0] == pytest.approx(65.0) + assert collector.sensor_states["light"][0] == pytest.approx(87.0) + assert collector.binary_states["out_pin_presence"][0] is True # Wait for Phase 3 frame (still_distance = 291cm, multi-byte) try: await asyncio.wait_for(phase3_received, timeout=15.0) except TimeoutError: pytest.fail( - f"Timeout waiting for Phase 3 frame. Received sensor states:\n" - f" still_distance: {sensor_states['still_distance']}\n" - f" moving_distance: {sensor_states['moving_distance']}" + f"Timeout waiting for Phase 3 frame. Received:\n" + f" still_distance: {collector.sensor_states['still_distance']}" ) - # Phase 3: still distance = 0x0123 = 291cm (multi-byte distance test) - phase3_still = [ - v for v in sensor_states["still_distance"] if v == pytest.approx(291.0) - ] - assert len(phase3_still) >= 1, ( - f"Expected still_distance=291, got: {sensor_states['still_distance']}" - ) + assert pytest.approx(291.0) in collector.sensor_states["still_distance"] diff --git a/tests/integration/test_uart_mock_ld2412.py b/tests/integration/test_uart_mock_ld2412.py index cf7324ceed..a964ba0073 100644 --- a/tests/integration/test_uart_mock_ld2412.py +++ b/tests/integration/test_uart_mock_ld2412.py @@ -21,16 +21,10 @@ from __future__ import annotations import asyncio from pathlib import Path -from aioesphomeapi import ( - BinarySensorInfo, - BinarySensorState, - EntityState, - SensorInfo, - SensorState, -) +from aioesphomeapi import ButtonInfo import pytest -from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity +from .state_utils import InitialStateHelper, SensorStateCollector, find_entity from .types import APIClientConnectedFactory, RunCompiledFunction @@ -64,104 +58,65 @@ async def test_uart_mock_ld2412( if "uart_mock" in line and "TX " in line: tx_log_lines.append(line) - # Track sensor state updates (after initial state is swallowed) - sensor_states: dict[str, list[float]] = { - "moving_distance": [], - "still_distance": [], - "moving_energy": [], - "still_energy": [], - "detection_distance": [], - } - binary_states: dict[str, list[bool]] = { - "has_target": [], - "has_moving_target": [], - "has_still_target": [], - } + collector = SensorStateCollector( + sensor_names=[ + "moving_distance", + "still_distance", + "moving_energy", + "still_energy", + "detection_distance", + ], + binary_sensor_names=[ + "has_target", + "has_moving_target", + "has_still_target", + ], + ) # Signal when we see recovery frame values - recovery_received = loop.create_future() - - def on_state(state: EntityState) -> None: - if isinstance(state, SensorState) and not state.missing_state: - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in sensor_states: - sensor_states[sensor_name].append(state.state) - # Check if this is the recovery frame (moving_distance = 50) - if ( - sensor_name == "moving_distance" - and state.state == pytest.approx(50.0) - and not recovery_received.done() - ): - recovery_received.set_result(True) - elif isinstance(state, BinarySensorState): - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in binary_states: - binary_states[sensor_name].append(state.state) + recovery_received = collector.add_waiter( + lambda: pytest.approx(50.0) in collector.sensor_states["moving_distance"] + ) async with ( run_compiled(yaml_config, line_callback=line_callback), api_client_connected() as client, ): entities, _ = await client.list_entities_services() - - # Build key mappings for all sensor types - all_names = list(sensor_states.keys()) + list(binary_states.keys()) - # Sort by descending length to avoid substring collisions - # (e.g., "still_energy" matching "gate_0_still_energy") - all_names.sort(key=len, reverse=True) - key_to_sensor = build_key_to_entity_mapping(entities, all_names) + collector.build_key_mapping(entities) # Set up initial state helper initial_state_helper = InitialStateHelper(entities) - client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) + client.subscribe_states( + initial_state_helper.on_state_wrapper(collector.on_state) + ) try: await initial_state_helper.wait_for_initial_states() except TimeoutError: pytest.fail("Timeout waiting for initial states") - # Phase 1 values are in the initial states (swallowed by InitialStateHelper). - # Verify them via initial_states dict. - moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo) - assert moving_dist_entity is not None - initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key) - assert initial_moving is not None and isinstance(initial_moving, SensorState) - assert initial_moving.state == pytest.approx(100.0), ( - f"Initial moving distance should be 100, got {initial_moving.state}" - ) + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) - still_dist_entity = find_entity(entities, "still_distance", SensorInfo) - assert still_dist_entity is not None - initial_still = initial_state_helper.initial_states.get(still_dist_entity.key) - assert initial_still is not None and isinstance(initial_still, SensorState) - assert initial_still.state == pytest.approx(120.0), ( - f"Initial still distance should be 120, got {initial_still.state}" - ) + # Wait for Phase 1 - all sensors and binary sensors have at least one value + try: + await collector.wait_for_all(timeout=3.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for Phase 1 frame. Received:\n" + f" sensor_states: {collector.sensor_states}\n" + f" binary_states: {collector.binary_states}" + ) - moving_energy_entity = find_entity(entities, "moving_energy", SensorInfo) - assert moving_energy_entity is not None - initial_me = initial_state_helper.initial_states.get(moving_energy_entity.key) - assert initial_me is not None and isinstance(initial_me, SensorState) - assert initial_me.state == pytest.approx(50.0), ( - f"Initial moving energy should be 50, got {initial_me.state}" - ) - - still_energy_entity = find_entity(entities, "still_energy", SensorInfo) - assert still_energy_entity is not None - initial_se = initial_state_helper.initial_states.get(still_energy_entity.key) - assert initial_se is not None and isinstance(initial_se, SensorState) - assert initial_se.state == pytest.approx(25.0), ( - f"Initial still energy should be 25, got {initial_se.state}" - ) - - # LD2412 detection_distance = moving_distance when MOVE_BITMASK is set - detect_dist_entity = find_entity(entities, "detection_distance", SensorInfo) - assert detect_dist_entity is not None - initial_dd = initial_state_helper.initial_states.get(detect_dist_entity.key) - assert initial_dd is not None and isinstance(initial_dd, SensorState) - assert initial_dd.state == pytest.approx(100.0), ( - f"Initial detection distance should be 100, got {initial_dd.state}" - ) + # Phase 1 values: moving=100, still=120, energy=50/25, detect=100 + assert collector.sensor_states["moving_distance"][0] == pytest.approx(100.0) + assert collector.sensor_states["still_distance"][0] == pytest.approx(120.0) + assert collector.sensor_states["moving_energy"][0] == pytest.approx(50.0) + assert collector.sensor_states["still_energy"][0] == pytest.approx(25.0) + assert collector.sensor_states["detection_distance"][0] == pytest.approx(100.0) # Wait for the recovery frame (Phase 5) to be parsed # This proves the component survived garbage + truncated + overflow @@ -169,12 +124,8 @@ async def test_uart_mock_ld2412( await asyncio.wait_for(recovery_received, timeout=3.0) except TimeoutError: pytest.fail( - f"Timeout waiting for recovery frame. Received sensor states:\n" - f" moving_distance: {sensor_states['moving_distance']}\n" - f" still_distance: {sensor_states['still_distance']}\n" - f" moving_energy: {sensor_states['moving_energy']}\n" - f" still_energy: {sensor_states['still_energy']}\n" - f" detection_distance: {sensor_states['detection_distance']}" + f"Timeout waiting for recovery frame. Received:\n" + f" sensor_states: {collector.sensor_states}" ) # Verify overflow warning was logged @@ -185,67 +136,36 @@ async def test_uart_mock_ld2412( # Verify LD2412 sent setup commands (TX logging) assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock" tx_data = " ".join(tx_log_lines) - # Verify command frame header appears (FD:FC:FB:FA) assert "FD:FC:FB:FA" in tx_data, ( "Expected LD2412 command frame header FD:FC:FB:FA in TX log" ) - # Verify command frame footer appears (04:03:02:01) assert "04:03:02:01" in tx_data, ( "Expected LD2412 command frame footer 04:03:02:01 in TX log" ) - # Recovery frame values (Phase 5, after overflow) - assert len(sensor_states["moving_distance"]) >= 1, ( - f"Expected recovery moving_distance, got: {sensor_states['moving_distance']}" - ) - # Find the recovery value (moving_distance = 50) - recovery_values = [ - v for v in sensor_states["moving_distance"] if v == pytest.approx(50.0) - ] - assert len(recovery_values) >= 1, ( - f"Expected moving_distance=50 in recovery, got: {sensor_states['moving_distance']}" - ) - # Recovery frame: moving=50, still=75, energy=100/80, detect=50 recovery_idx = next( i - for i, v in enumerate(sensor_states["moving_distance"]) + for i, v in enumerate(collector.sensor_states["moving_distance"]) if v == pytest.approx(50.0) ) - assert sensor_states["still_distance"][recovery_idx] == pytest.approx(75.0), ( - f"Recovery still distance should be 75, got {sensor_states['still_distance'][recovery_idx]}" + assert collector.sensor_states["still_distance"][recovery_idx] == pytest.approx( + 75.0 ) - assert sensor_states["moving_energy"][recovery_idx] == pytest.approx(100.0), ( - f"Recovery moving energy should be 100, got {sensor_states['moving_energy'][recovery_idx]}" + assert collector.sensor_states["moving_energy"][recovery_idx] == pytest.approx( + 100.0 ) - assert sensor_states["still_energy"][recovery_idx] == pytest.approx(80.0), ( - f"Recovery still energy should be 80, got {sensor_states['still_energy'][recovery_idx]}" - ) - # LD2412 detection_distance = moving_distance when MOVE_BITMASK set - assert sensor_states["detection_distance"][recovery_idx] == pytest.approx( - 50.0 - ), ( - f"Recovery detection distance should be 50, got {sensor_states['detection_distance'][recovery_idx]}" + assert collector.sensor_states["still_energy"][recovery_idx] == pytest.approx( + 80.0 ) + assert collector.sensor_states["detection_distance"][ + recovery_idx + ] == pytest.approx(50.0) - # Verify binary sensors detected targets - has_target_entity = find_entity(entities, "has_target", BinarySensorInfo) - assert has_target_entity is not None - initial_ht = initial_state_helper.initial_states.get(has_target_entity.key) - assert initial_ht is not None and isinstance(initial_ht, BinarySensorState) - assert initial_ht.state is True, "Has target should be True" - - has_moving_entity = find_entity(entities, "has_moving_target", BinarySensorInfo) - assert has_moving_entity is not None - initial_hm = initial_state_helper.initial_states.get(has_moving_entity.key) - assert initial_hm is not None and isinstance(initial_hm, BinarySensorState) - assert initial_hm.state is True, "Has moving target should be True" - - has_still_entity = find_entity(entities, "has_still_target", BinarySensorInfo) - assert has_still_entity is not None - initial_hs = initial_state_helper.initial_states.get(has_still_entity.key) - assert initial_hs is not None and isinstance(initial_hs, BinarySensorState) - assert initial_hs.state is True, "Has still target should be True" + # Verify binary sensors detected targets (from Phase 1 frame) + assert collector.binary_states["has_target"][0] is True + assert collector.binary_states["has_moving_target"][0] is True + assert collector.binary_states["has_still_target"][0] is True @pytest.mark.asyncio @@ -262,120 +182,75 @@ async def test_uart_mock_ld2412_engineering( "EXTERNAL_COMPONENT_PATH", external_components_path ) - loop = asyncio.get_running_loop() - - # Track sensor state updates (after initial state is swallowed) - sensor_states: dict[str, list[float]] = { - "moving_distance": [], - "still_distance": [], - "moving_energy": [], - "still_energy": [], - "detection_distance": [], - "light": [], - "gate_0_move_energy": [], - "gate_1_move_energy": [], - "gate_2_move_energy": [], - "gate_0_still_energy": [], - "gate_1_still_energy": [], - "gate_2_still_energy": [], - } - binary_states: dict[str, list[bool]] = { - "has_target": [], - "has_moving_target": [], - "has_still_target": [], - } + collector = SensorStateCollector( + sensor_names=[ + "moving_distance", + "still_distance", + "moving_energy", + "still_energy", + "detection_distance", + "light", + "gate_0_move_energy", + "gate_1_move_energy", + "gate_2_move_energy", + "gate_0_still_energy", + "gate_1_still_energy", + "gate_2_still_energy", + ], + binary_sensor_names=[ + "has_target", + "has_moving_target", + "has_still_target", + ], + ) # Signal when we see Phase 3 frame values - phase3_still_received = loop.create_future() - phase3_detect_received = loop.create_future() - - def on_state(state: EntityState) -> None: - if isinstance(state, SensorState) and not state.missing_state: - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in sensor_states: - sensor_states[sensor_name].append(state.state) - if ( - sensor_name == "still_distance" - and state.state == pytest.approx(291.0) - and not phase3_still_received.done() - ): - phase3_still_received.set_result(True) - if ( - sensor_name == "detection_distance" - and state.state == pytest.approx(291.0) - and not phase3_detect_received.done() - ): - phase3_detect_received.set_result(True) - elif isinstance(state, BinarySensorState): - sensor_name = key_to_sensor.get(state.key) - if sensor_name and sensor_name in binary_states: - binary_states[sensor_name].append(state.state) + phase3_still_received = collector.add_waiter( + lambda: pytest.approx(291.0) in collector.sensor_states["still_distance"] + ) + phase3_detect_received = collector.add_waiter( + lambda: pytest.approx(291.0) in collector.sensor_states["detection_distance"] + ) async with ( run_compiled(yaml_config), api_client_connected() as client, ): entities, _ = await client.list_entities_services() - - all_names = list(sensor_states.keys()) + list(binary_states.keys()) - # Sort by descending length to avoid substring collisions - # (e.g., "still_energy" matching "gate_0_still_energy") - all_names.sort(key=len, reverse=True) - key_to_sensor = build_key_to_entity_mapping(entities, all_names) + collector.build_key_mapping(entities) initial_state_helper = InitialStateHelper(entities) - client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) + client.subscribe_states( + initial_state_helper.on_state_wrapper(collector.on_state) + ) try: await initial_state_helper.wait_for_initial_states() except TimeoutError: pytest.fail("Timeout waiting for initial states") - # Phase 1 initial values (engineering mode frame): + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) + + # Wait for Phase 1 - all sensors and binary sensors have at least one value + try: + await collector.wait_for_all(timeout=3.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for Phase 1 frame. Received:\n" + f" sensor_states: {collector.sensor_states}\n" + f" binary_states: {collector.binary_states}" + ) + + # Phase 1 values (engineering mode frame): # moving=30, energy=100, still=30, energy=100, detect=30 - moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo) - assert moving_dist_entity is not None - initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key) - assert initial_moving is not None and isinstance(initial_moving, SensorState) - assert initial_moving.state == pytest.approx(30.0), ( - f"Initial moving distance should be 30, got {initial_moving.state}" - ) - - still_dist_entity = find_entity(entities, "still_distance", SensorInfo) - assert still_dist_entity is not None - initial_still = initial_state_helper.initial_states.get(still_dist_entity.key) - assert initial_still is not None and isinstance(initial_still, SensorState) - assert initial_still.state == pytest.approx(30.0), ( - f"Initial still distance should be 30, got {initial_still.state}" - ) - - # Verify engineering mode sensors from initial state - # Gate 0 moving energy = 0x64 = 100 - gate0_move_entity = find_entity(entities, "gate_0_move_energy", SensorInfo) - assert gate0_move_entity is not None - initial_g0m = initial_state_helper.initial_states.get(gate0_move_entity.key) - assert initial_g0m is not None and isinstance(initial_g0m, SensorState) - assert initial_g0m.state == pytest.approx(100.0), ( - f"Gate 0 move energy should be 100, got {initial_g0m.state}" - ) - - # Gate 1 moving energy = 0x41 = 65 - gate1_move_entity = find_entity(entities, "gate_1_move_energy", SensorInfo) - assert gate1_move_entity is not None - initial_g1m = initial_state_helper.initial_states.get(gate1_move_entity.key) - assert initial_g1m is not None and isinstance(initial_g1m, SensorState) - assert initial_g1m.state == pytest.approx(65.0), ( - f"Gate 1 move energy should be 65, got {initial_g1m.state}" - ) - - # Light sensor = 0x57 = 87 - light_entity = find_entity(entities, "light", SensorInfo) - assert light_entity is not None - initial_light = initial_state_helper.initial_states.get(light_entity.key) - assert initial_light is not None and isinstance(initial_light, SensorState) - assert initial_light.state == pytest.approx(87.0), ( - f"Light sensor should be 87, got {initial_light.state}" - ) + assert collector.sensor_states["moving_distance"][0] == pytest.approx(30.0) + assert collector.sensor_states["still_distance"][0] == pytest.approx(30.0) + assert collector.sensor_states["gate_0_move_energy"][0] == pytest.approx(100.0) + assert collector.sensor_states["gate_1_move_energy"][0] == pytest.approx(65.0) + assert collector.sensor_states["light"][0] == pytest.approx(87.0) # Wait for Phase 3 frame: still_distance = 291cm (multi-byte) try: @@ -383,25 +258,18 @@ async def test_uart_mock_ld2412_engineering( except TimeoutError: pytest.fail( f"Timeout waiting for Phase 3 still_distance. Received:\n" - f" still_distance: {sensor_states['still_distance']}\n" - f" moving_distance: {sensor_states['moving_distance']}" + f" still_distance: {collector.sensor_states['still_distance']}" ) - assert pytest.approx(291.0) in sensor_states["still_distance"], ( - f"Expected still_distance=291, got: {sensor_states['still_distance']}" - ) + assert pytest.approx(291.0) in collector.sensor_states["still_distance"] # Wait for Phase 3: detection_distance = 291 (still-only target) - # target_state=0x02 so LD2412 uses still_distance for detection_distance. - # The throttle_with_priority filter may delay this value. try: await asyncio.wait_for(phase3_detect_received, timeout=3.0) except TimeoutError: pytest.fail( - f"Timeout waiting for detection_distance=291 (still-only target). " - f"Received: {sensor_states['detection_distance']}" + f"Timeout waiting for detection_distance=291. " + f"Received: {collector.sensor_states['detection_distance']}" ) - assert pytest.approx(291.0) in sensor_states["detection_distance"], ( - f"Expected detection_distance=291, got: {sensor_states['detection_distance']}" - ) + assert pytest.approx(291.0) in collector.sensor_states["detection_distance"] diff --git a/tests/integration/test_uart_mock_modbus.py b/tests/integration/test_uart_mock_modbus.py index 309cb56dc9..bf3c069750 100644 --- a/tests/integration/test_uart_mock_modbus.py +++ b/tests/integration/test_uart_mock_modbus.py @@ -12,10 +12,10 @@ from __future__ import annotations import asyncio from pathlib import Path -from aioesphomeapi import EntityState, SensorState +from aioesphomeapi import ButtonInfo, EntityState, SensorState import pytest -from .state_utils import InitialStateHelper, build_key_to_entity_mapping +from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity from .types import APIClientConnectedFactory, RunCompiledFunction @@ -74,6 +74,11 @@ async def test_uart_mock_modbus( except TimeoutError: pytest.fail("Timeout waiting for initial states") + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) + # Wait for basic register to be updated with successful parse try: await asyncio.wait_for(basic_register_changed, timeout=15.0) @@ -143,6 +148,11 @@ async def test_uart_mock_modbus_timing( except TimeoutError: pytest.fail("Timeout waiting for initial states") + # Start the UART mock scenario now that we're subscribed + start_btn = find_entity(entities, "start_scenario", ButtonInfo) + assert start_btn is not None, "Start Scenario button not found" + client.button_command(start_btn.key) + # Wait for voltage to be updated with successful parse try: await asyncio.wait_for(voltage_changed, timeout=15.0)