mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 14:19:03 +00:00
[ld2450] Add integration tests with mock UART (#14611)
This commit is contained in:
221
tests/integration/fixtures/uart_mock_ld2450.yaml
Normal file
221
tests/integration/fixtures/uart_mock_ld2450.yaml
Normal file
@@ -0,0 +1,221 @@
|
||||
esphome:
|
||||
name: uart-mock-ld2450-test
|
||||
|
||||
host:
|
||||
api:
|
||||
logger:
|
||||
level: VERBOSE
|
||||
|
||||
external_components:
|
||||
- source:
|
||||
type: local
|
||||
path: EXTERNAL_COMPONENT_PATH
|
||||
|
||||
# Dummy uart entry to satisfy ld2450's DEPENDENCIES = ["uart"]
|
||||
# The actual UART bus used is the uart_mock component below
|
||||
uart:
|
||||
baud_rate: 115200
|
||||
port: /dev/null
|
||||
|
||||
uart_mock:
|
||||
id: mock_uart
|
||||
baud_rate: 256000
|
||||
auto_start: false
|
||||
responses:
|
||||
# Catch-all response: match any command footer (04 03 02 01).
|
||||
# Returns a generic ACK to unblock setup commands.
|
||||
#
|
||||
# Response layout:
|
||||
# [0-3] FD FC FB FA = header
|
||||
# [4-5] 04 00 = length 4
|
||||
# [6] FF = cmd (handled as CMD_ENABLE_CONF)
|
||||
# [7] 01 = status (ACK)
|
||||
# [8-9] 00 00 = error = 0
|
||||
# [10-13] 04 03 02 01 = footer
|
||||
- expect_tx: [0x04, 0x03, 0x02, 0x01]
|
||||
inject_rx:
|
||||
[
|
||||
0xFD, 0xFC, 0xFB, 0xFA,
|
||||
0x04, 0x00,
|
||||
0xFF, 0x01,
|
||||
0x00, 0x00,
|
||||
0x04, 0x03, 0x02, 0x01,
|
||||
]
|
||||
|
||||
injections:
|
||||
# Phase 1 (t=100ms): Valid LD2450 periodic data frame - happy path
|
||||
# The buffer is clean at this point, so this frame should parse correctly.
|
||||
#
|
||||
# Target 1: X=-500mm, Y=1000mm, Speed=-50mm/s (approaching), Res=320mm
|
||||
# X: magnitude=500 (0x01F4), negative → high=0x01, low=0xF4
|
||||
# Y: magnitude=1000 (0x03E8), positive → high=0x83, low=0xE8
|
||||
# Speed: raw=5, negative (approaching) → high=0x00, low=0x05, decoded=-50mm/s
|
||||
# Resolution: 320 → low=0x40, high=0x01
|
||||
# Distance: sqrt(500²+1000²) = sqrt(1250000) ≈ 1118mm
|
||||
#
|
||||
# Target 2: X=200mm, Y=500mm, Speed=0 (stationary), Res=100mm
|
||||
# X: magnitude=200 (0x00C8), positive → high=0x80, low=0xC8
|
||||
# Y: magnitude=500 (0x01F4), positive → high=0x81, low=0xF4
|
||||
# Speed: 0 → 0x00, 0x00
|
||||
# Resolution: 100 → low=0x64, high=0x00
|
||||
# Distance: sqrt(200²+500²) = sqrt(290000) ≈ 538mm
|
||||
#
|
||||
# Target 3: No target (all zeros)
|
||||
# Distance: 0 → sensors publish unknown/NaN
|
||||
#
|
||||
# Counts: target_count=2, moving_target_count=1, still_target_count=1
|
||||
#
|
||||
# Frame layout (30 bytes):
|
||||
# [0-3] AA FF 03 00 = periodic data header
|
||||
# [4-11] Target 1 (8 bytes): X_L X_H Y_L Y_H SPD_L SPD_H RES_L RES_H
|
||||
# [12-19] Target 2 (8 bytes)
|
||||
# [20-27] Target 3 (8 bytes)
|
||||
# [28-29] 55 CC = periodic data footer
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xAA, 0xFF, 0x03, 0x00,
|
||||
0xF4, 0x01, 0xE8, 0x83, 0x05, 0x00, 0x40, 0x01,
|
||||
0xC8, 0x80, 0xF4, 0x81, 0x00, 0x00, 0x64, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x55, 0xCC,
|
||||
]
|
||||
|
||||
# Phase 2 (t=300ms): Garbage bytes
|
||||
# LD2450's readline_ does NOT reject bytes at position 0 (unlike LD2412),
|
||||
# so these bytes accumulate in the buffer. buffer_pos_ goes from 0 to 7.
|
||||
- delay: 200ms
|
||||
inject_rx: [0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11, 0x22]
|
||||
|
||||
# Phase 3 (t=400ms): Truncated frame (header + partial data, no footer)
|
||||
# More bytes accumulating in the buffer without a footer match.
|
||||
# After this, buffer_pos_ = 7 + 8 = 15.
|
||||
- delay: 100ms
|
||||
inject_rx: [0xAA, 0xFF, 0x03, 0x00, 0x01, 0x02, 0x03, 0x04]
|
||||
|
||||
# Phase 4 (t=600ms): Overflow - inject 75 bytes of 0xFF (MAX_LINE_LENGTH=45)
|
||||
# Buffer has 15 bytes from phases 2+3.
|
||||
# readline_() stores bytes while buffer_pos_ < 44. When buffer_pos_ == 44,
|
||||
# the next byte triggers overflow: logs warning, resets buffer_pos_ to 0,
|
||||
# and discards that byte.
|
||||
#
|
||||
# First overflow: 29 bytes fill positions 15-43 (buffer_pos_=44), byte 30
|
||||
# triggers overflow (discarded). Total consumed: 30 bytes.
|
||||
# Second overflow: 44 bytes fill positions 0-43 (buffer_pos_=44), byte 45
|
||||
# triggers overflow (discarded). Total consumed: 30+45 = 75 bytes.
|
||||
# After both overflows, buffer_pos_ = 0 (clean state for recovery frame).
|
||||
- delay: 200ms
|
||||
inject_rx:
|
||||
[
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
]
|
||||
|
||||
# Phase 5 (t=700ms): Valid frame after overflow - recovery test
|
||||
# Buffer was reset by overflow. This valid frame should parse correctly.
|
||||
#
|
||||
# Target 1: X=300mm, Y=400mm, Speed=30mm/s (moving away), Res=100mm
|
||||
# X: magnitude=300 (0x012C), positive → high=0x81, low=0x2C
|
||||
# Y: magnitude=400 (0x0190), positive → high=0x81, low=0x90
|
||||
# Speed: raw=3, positive (moving away) → high=0x80, low=0x03, decoded=30mm/s
|
||||
# Resolution: 100 → low=0x64, high=0x00
|
||||
# Distance: sqrt(300²+400²) = 500mm
|
||||
#
|
||||
# Target 2 & 3: No target (all zeros)
|
||||
# Counts: target_count=1, moving_target_count=1, still_target_count=0
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xAA, 0xFF, 0x03, 0x00,
|
||||
0x2C, 0x81, 0x90, 0x81, 0x03, 0x80, 0x64, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x55, 0xCC,
|
||||
]
|
||||
|
||||
ld2450:
|
||||
id: ld2450_dev
|
||||
uart_id: mock_uart
|
||||
|
||||
sensor:
|
||||
- platform: ld2450
|
||||
ld2450_id: ld2450_dev
|
||||
target_count:
|
||||
name: "Target Count"
|
||||
filters: &sensor_filters
|
||||
- timeout:
|
||||
timeout: 50ms
|
||||
value: last
|
||||
- throttle_with_priority: 50ms
|
||||
still_target_count:
|
||||
name: "Still Target Count"
|
||||
filters: *sensor_filters
|
||||
moving_target_count:
|
||||
name: "Moving Target Count"
|
||||
filters: *sensor_filters
|
||||
target_1:
|
||||
x:
|
||||
name: "Target 1 X"
|
||||
filters: *sensor_filters
|
||||
y:
|
||||
name: "Target 1 Y"
|
||||
filters: *sensor_filters
|
||||
speed:
|
||||
name: "Target 1 Speed"
|
||||
filters: *sensor_filters
|
||||
distance:
|
||||
name: "Target 1 Distance"
|
||||
filters: *sensor_filters
|
||||
resolution:
|
||||
name: "Target 1 Resolution"
|
||||
filters: *sensor_filters
|
||||
angle:
|
||||
name: "Target 1 Angle"
|
||||
filters: *sensor_filters
|
||||
target_2:
|
||||
x:
|
||||
name: "Target 2 X"
|
||||
filters: *sensor_filters
|
||||
y:
|
||||
name: "Target 2 Y"
|
||||
filters: *sensor_filters
|
||||
speed:
|
||||
name: "Target 2 Speed"
|
||||
filters: *sensor_filters
|
||||
distance:
|
||||
name: "Target 2 Distance"
|
||||
filters: *sensor_filters
|
||||
|
||||
binary_sensor:
|
||||
- platform: ld2450
|
||||
ld2450_id: ld2450_dev
|
||||
has_target:
|
||||
name: "Has Target"
|
||||
filters: &binary_sensor_filters
|
||||
- settle: 50ms
|
||||
has_moving_target:
|
||||
name: "Has Moving Target"
|
||||
filters: *binary_sensor_filters
|
||||
has_still_target:
|
||||
name: "Has Still Target"
|
||||
filters: *binary_sensor_filters
|
||||
|
||||
text_sensor:
|
||||
- platform: ld2450
|
||||
ld2450_id: ld2450_dev
|
||||
target_1:
|
||||
direction:
|
||||
name: "Target 1 Direction"
|
||||
|
||||
button:
|
||||
- platform: template
|
||||
name: "Start Scenario"
|
||||
id: start_scenario_btn
|
||||
on_press:
|
||||
- lambda: 'id(mock_uart).start_scenario();'
|
||||
@@ -13,6 +13,7 @@ from aioesphomeapi import (
|
||||
EntityInfo,
|
||||
EntityState,
|
||||
SensorState,
|
||||
TextSensorState,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -244,12 +245,13 @@ class InitialStateHelper:
|
||||
|
||||
|
||||
class SensorStateCollector:
|
||||
"""Collects sensor and binary sensor state updates and provides wait helpers.
|
||||
"""Collects sensor, binary sensor, and text sensor state updates with wait helpers.
|
||||
|
||||
Usage:
|
||||
collector = SensorStateCollector(
|
||||
sensor_names=["moving_distance", "still_distance"],
|
||||
binary_sensor_names=["has_target"],
|
||||
text_sensor_names=["direction"],
|
||||
)
|
||||
# Use collector.on_state as the callback (or wrap it)
|
||||
client.subscribe_states(helper.on_state_wrapper(collector.on_state))
|
||||
@@ -259,18 +261,23 @@ class SensorStateCollector:
|
||||
|
||||
# Access collected states
|
||||
assert collector.sensor_states["moving_distance"][0] == approx(100.0)
|
||||
assert collector.text_sensor_states["direction"][0] == "Approaching"
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sensor_names: list[str],
|
||||
binary_sensor_names: list[str] | None = None,
|
||||
text_sensor_names: list[str] | None = None,
|
||||
entities: list[EntityInfo] | None = None,
|
||||
) -> None:
|
||||
self.sensor_states: dict[str, list[float]] = {name: [] for name in sensor_names}
|
||||
self.binary_states: dict[str, list[bool]] = {
|
||||
name: [] for name in (binary_sensor_names or [])
|
||||
}
|
||||
self.text_sensor_states: dict[str, list[str]] = {
|
||||
name: [] for name in (text_sensor_names or [])
|
||||
}
|
||||
self._key_to_sensor: dict[int, str] = {}
|
||||
self._waiters: list[tuple[Callable[[], bool], asyncio.Future[bool]]] = []
|
||||
|
||||
@@ -279,7 +286,11 @@ class SensorStateCollector:
|
||||
|
||||
def build_key_mapping(self, entities: list[EntityInfo]) -> None:
|
||||
"""Build key-to-name mapping from entities. Sorted by descending length."""
|
||||
all_names = list(self.sensor_states.keys()) + list(self.binary_states.keys())
|
||||
all_names = (
|
||||
list(self.sensor_states.keys())
|
||||
+ list(self.binary_states.keys())
|
||||
+ list(self.text_sensor_states.keys())
|
||||
)
|
||||
all_names.sort(key=len, reverse=True)
|
||||
self._key_to_sensor = build_key_to_entity_mapping(entities, all_names)
|
||||
|
||||
@@ -295,6 +306,11 @@ class SensorStateCollector:
|
||||
if sensor_name and sensor_name in self.binary_states:
|
||||
self.binary_states[sensor_name].append(state.state)
|
||||
self._check_waiters()
|
||||
elif isinstance(state, TextSensorState) and not state.missing_state:
|
||||
sensor_name = self._key_to_sensor.get(state.key)
|
||||
if sensor_name and sensor_name in self.text_sensor_states:
|
||||
self.text_sensor_states[sensor_name].append(state.state)
|
||||
self._check_waiters()
|
||||
|
||||
def _check_waiters(self) -> None:
|
||||
"""Check all pending waiters and resolve any whose condition is met."""
|
||||
@@ -303,9 +319,11 @@ class SensorStateCollector:
|
||||
future.set_result(True)
|
||||
|
||||
def _all_have_values(self) -> bool:
|
||||
"""Check if all sensor and binary sensor lists have at least one value."""
|
||||
return all(len(v) >= 1 for v in self.sensor_states.values()) and all(
|
||||
len(v) >= 1 for v in self.binary_states.values()
|
||||
"""Check if all sensor, binary sensor, and text sensor lists have at least one value."""
|
||||
return (
|
||||
all(len(v) >= 1 for v in self.sensor_states.values())
|
||||
and all(len(v) >= 1 for v in self.binary_states.values())
|
||||
and all(len(v) >= 1 for v in self.text_sensor_states.values())
|
||||
)
|
||||
|
||||
async def wait_for_all(self, timeout: float = 3.0) -> None:
|
||||
|
||||
204
tests/integration/test_uart_mock_ld2450.py
Normal file
204
tests/integration/test_uart_mock_ld2450.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""Integration test for LD2450 component with mock UART.
|
||||
|
||||
Tests:
|
||||
test_uart_mock_ld2450:
|
||||
1. Happy path - valid periodic data frame publishes correct target sensor values
|
||||
2. Multi-target tracking - verifies target count, moving/still counts
|
||||
3. Target coordinate decoding - signed X/Y coordinates with sign-magnitude encoding
|
||||
4. Speed decoding - approaching (negative) and stationary (zero) targets
|
||||
5. Distance calculation - computed from X/Y via sqrt(x²+y²)
|
||||
6. Direction text sensor - "Approaching" for negative speed target
|
||||
7. Garbage resilience - random bytes don't crash the component
|
||||
8. Truncated frame handling - partial frame doesn't corrupt state
|
||||
9. Buffer overflow recovery - overflow resets the parser
|
||||
10. Post-overflow parsing - next valid frame after overflow is parsed correctly
|
||||
11. TX logging - verifies LD2450 sends expected setup commands
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
from aioesphomeapi import ButtonInfo
|
||||
import pytest
|
||||
|
||||
from .state_utils import InitialStateHelper, SensorStateCollector, find_entity
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uart_mock_ld2450(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test LD2450 data parsing with happy path, garbage, overflow, and recovery."""
|
||||
# Replace external component path placeholder
|
||||
external_components_path = str(
|
||||
Path(__file__).parent / "fixtures" / "external_components"
|
||||
)
|
||||
yaml_config = yaml_config.replace(
|
||||
"EXTERNAL_COMPONENT_PATH", external_components_path
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# Track overflow warning in logs
|
||||
overflow_seen = loop.create_future()
|
||||
|
||||
# Track TX data logged by the mock for assertions
|
||||
tx_log_lines: list[str] = []
|
||||
|
||||
def line_callback(line: str) -> None:
|
||||
if "Max command length exceeded" in line and not overflow_seen.done():
|
||||
overflow_seen.set_result(True)
|
||||
# Capture all TX log lines from uart_mock
|
||||
if "uart_mock" in line and "TX " in line:
|
||||
tx_log_lines.append(line)
|
||||
|
||||
collector = SensorStateCollector(
|
||||
sensor_names=[
|
||||
"target_1_x",
|
||||
"target_1_y",
|
||||
"target_1_speed",
|
||||
"target_1_distance",
|
||||
"target_1_resolution",
|
||||
"target_1_angle",
|
||||
"target_2_x",
|
||||
"target_2_y",
|
||||
"target_2_speed",
|
||||
"target_2_distance",
|
||||
"target_count",
|
||||
"still_target_count",
|
||||
"moving_target_count",
|
||||
],
|
||||
binary_sensor_names=[
|
||||
"has_target",
|
||||
"has_moving_target",
|
||||
"has_still_target",
|
||||
],
|
||||
text_sensor_names=[
|
||||
"target_1_direction",
|
||||
],
|
||||
)
|
||||
|
||||
# Signal when we see recovery frame values (target 1 distance ≈ 500mm)
|
||||
recovery_received = collector.add_waiter(
|
||||
lambda: (
|
||||
pytest.approx(500.0, abs=1.0)
|
||||
in collector.sensor_states["target_1_distance"]
|
||||
)
|
||||
)
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config, line_callback=line_callback),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
entities, _ = await client.list_entities_services()
|
||||
collector.build_key_mapping(entities)
|
||||
|
||||
# Set up initial state helper
|
||||
initial_state_helper = InitialStateHelper(entities)
|
||||
client.subscribe_states(
|
||||
initial_state_helper.on_state_wrapper(collector.on_state)
|
||||
)
|
||||
|
||||
try:
|
||||
await initial_state_helper.wait_for_initial_states()
|
||||
except TimeoutError:
|
||||
pytest.fail("Timeout waiting for initial states")
|
||||
|
||||
# Start the UART mock scenario now that we're subscribed
|
||||
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
|
||||
assert start_btn is not None, "Start Scenario button not found"
|
||||
client.button_command(start_btn.key)
|
||||
|
||||
# Wait for Phase 1 - all sensors and binary sensors have at least one value
|
||||
try:
|
||||
await collector.wait_for_all(timeout=5.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Timeout waiting for Phase 1 frame. Received:\n"
|
||||
f" sensor_states: {collector.sensor_states}\n"
|
||||
f" binary_states: {collector.binary_states}\n"
|
||||
f" text_states: {collector.text_sensor_states}"
|
||||
)
|
||||
|
||||
# Phase 1 values:
|
||||
# Target 1: X=-500, Y=1000, Speed=-50 (approaching), Res=320
|
||||
# Distance = sqrt(500²+1000²) ≈ 1118mm
|
||||
assert collector.sensor_states["target_1_x"][0] == pytest.approx(-500.0)
|
||||
assert collector.sensor_states["target_1_y"][0] == pytest.approx(1000.0)
|
||||
assert collector.sensor_states["target_1_speed"][0] == pytest.approx(-50.0)
|
||||
assert collector.sensor_states["target_1_resolution"][0] == pytest.approx(320.0)
|
||||
# Distance computed from X/Y
|
||||
assert collector.sensor_states["target_1_distance"][0] == pytest.approx(
|
||||
1118.0, abs=1.0
|
||||
)
|
||||
|
||||
# Target 2: X=200, Y=500, Speed=0 (stationary), Res=100
|
||||
# Distance = sqrt(200²+500²) ≈ 538mm
|
||||
assert collector.sensor_states["target_2_x"][0] == pytest.approx(200.0)
|
||||
assert collector.sensor_states["target_2_y"][0] == pytest.approx(500.0)
|
||||
assert collector.sensor_states["target_2_speed"][0] == pytest.approx(0.0)
|
||||
assert collector.sensor_states["target_2_distance"][0] == pytest.approx(
|
||||
538.0, abs=1.0
|
||||
)
|
||||
|
||||
# Target counts: 2 targets total, 1 moving, 1 still
|
||||
assert collector.sensor_states["target_count"][0] == pytest.approx(2.0)
|
||||
assert collector.sensor_states["moving_target_count"][0] == pytest.approx(1.0)
|
||||
assert collector.sensor_states["still_target_count"][0] == pytest.approx(1.0)
|
||||
|
||||
# Binary sensors: all true (targets detected)
|
||||
assert collector.binary_states["has_target"][0] is True
|
||||
assert collector.binary_states["has_moving_target"][0] is True
|
||||
assert collector.binary_states["has_still_target"][0] is True
|
||||
|
||||
# Direction text sensor: Target 1 is approaching (speed < 0)
|
||||
assert collector.text_sensor_states["target_1_direction"][0] == "Approaching"
|
||||
|
||||
# Wait for the recovery frame (Phase 5) to be parsed
|
||||
# This proves the component survived garbage + truncated + overflow
|
||||
try:
|
||||
await asyncio.wait_for(recovery_received, timeout=5.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Timeout waiting for recovery frame. Received:\n"
|
||||
f" sensor_states: {collector.sensor_states}"
|
||||
)
|
||||
|
||||
# Verify overflow warning was logged
|
||||
assert overflow_seen.done(), (
|
||||
"Expected 'Max command length exceeded' warning in logs"
|
||||
)
|
||||
|
||||
# Verify LD2450 sent setup commands (TX logging)
|
||||
assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock"
|
||||
tx_data = " ".join(tx_log_lines)
|
||||
# Verify command frame header appears (FD:FC:FB:FA)
|
||||
assert "FD:FC:FB:FA" in tx_data, (
|
||||
"Expected LD2450 command frame header FD:FC:FB:FA in TX log"
|
||||
)
|
||||
# Verify command frame footer appears (04:03:02:01)
|
||||
assert "04:03:02:01" in tx_data, (
|
||||
"Expected LD2450 command frame footer 04:03:02:01 in TX log"
|
||||
)
|
||||
|
||||
# Recovery frame values (Phase 5, after overflow):
|
||||
# Target 1: X=300, Y=400, Distance=500, Speed=30 (moving away)
|
||||
# target_count=1, moving=1, still=0
|
||||
#
|
||||
# Note: throttle filters cause sensor lists to have different lengths,
|
||||
# so we check each value appeared somewhere rather than using a shared index.
|
||||
assert (
|
||||
pytest.approx(500.0, abs=1.0)
|
||||
in collector.sensor_states["target_1_distance"]
|
||||
)
|
||||
assert pytest.approx(300.0) in collector.sensor_states["target_1_x"]
|
||||
assert pytest.approx(400.0) in collector.sensor_states["target_1_y"]
|
||||
assert pytest.approx(30.0) in collector.sensor_states["target_1_speed"]
|
||||
assert pytest.approx(1.0) in collector.sensor_states["target_count"]
|
||||
assert pytest.approx(1.0) in collector.sensor_states["moving_target_count"]
|
||||
assert pytest.approx(0.0) in collector.sensor_states["still_target_count"]
|
||||
Reference in New Issue
Block a user