From 80a2acca4f576b5c43fa0a87b5752a68e1d85b6c Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 1 Mar 2026 18:19:32 -1000 Subject: [PATCH] [ld2410] Add UART mock integration test for LD2410 component (#14377) --- .../external_components/uart_mock/__init__.py | 86 ++++ .../uart_mock/uart_mock.cpp | 159 +++++++ .../external_components/uart_mock/uart_mock.h | 78 ++++ .../fixtures/uart_mock_ld2410.yaml | 145 +++++++ .../uart_mock_ld2410_engineering.yaml | 156 +++++++ tests/integration/test_uart_mock_ld2410.py | 392 ++++++++++++++++++ 6 files changed, 1016 insertions(+) create mode 100644 tests/integration/fixtures/external_components/uart_mock/__init__.py create mode 100644 tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp create mode 100644 tests/integration/fixtures/external_components/uart_mock/uart_mock.h create mode 100644 tests/integration/fixtures/uart_mock_ld2410.yaml create mode 100644 tests/integration/fixtures/uart_mock_ld2410_engineering.yaml create mode 100644 tests/integration/test_uart_mock_ld2410.py diff --git a/tests/integration/fixtures/external_components/uart_mock/__init__.py b/tests/integration/fixtures/external_components/uart_mock/__init__.py new file mode 100644 index 0000000000..5bd879ab12 --- /dev/null +++ b/tests/integration/fixtures/external_components/uart_mock/__init__.py @@ -0,0 +1,86 @@ +import esphome.codegen as cg +from esphome.components import uart +from esphome.components.uart import CONF_DATA_BITS, CONF_PARITY, CONF_STOP_BITS +import esphome.config_validation as cv +from esphome.const import CONF_BAUD_RATE, CONF_DATA, CONF_DELAY, CONF_ID, CONF_INTERVAL + +CODEOWNERS = ["@esphome/tests"] +MULTI_CONF = True + +uart_mock_ns = cg.esphome_ns.namespace("uart_mock") +MockUartComponent = uart_mock_ns.class_( + "MockUartComponent", uart.UARTComponent, cg.Component +) + +CONF_INJECTIONS = "injections" +CONF_RESPONSES = "responses" +CONF_INJECT_RX = "inject_rx" +CONF_EXPECT_TX = "expect_tx" +CONF_PERIODIC_RX = "periodic_rx" + +UART_PARITY_OPTIONS = { + "NONE": uart.UARTParityOptions.UART_CONFIG_PARITY_NONE, + "EVEN": uart.UARTParityOptions.UART_CONFIG_PARITY_EVEN, + "ODD": uart.UARTParityOptions.UART_CONFIG_PARITY_ODD, +} + +INJECTION_SCHEMA = cv.Schema( + { + cv.Required(CONF_INJECT_RX): [cv.hex_uint8_t], + cv.Optional(CONF_DELAY, default="0ms"): cv.positive_time_period_milliseconds, + } +) + +RESPONSE_SCHEMA = cv.Schema( + { + cv.Required(CONF_EXPECT_TX): [cv.hex_uint8_t], + cv.Required(CONF_INJECT_RX): [cv.hex_uint8_t], + } +) + +PERIODIC_RX_SCHEMA = cv.Schema( + { + cv.Required(CONF_DATA): [cv.hex_uint8_t], + cv.Required(CONF_INTERVAL): cv.positive_time_period_milliseconds, + } +) + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(MockUartComponent), + cv.Required(CONF_BAUD_RATE): cv.int_range(min=1), + cv.Optional(CONF_STOP_BITS, default=1): cv.one_of(1, 2, int=True), + cv.Optional(CONF_DATA_BITS, default=8): cv.int_range(min=5, max=8), + cv.Optional(CONF_PARITY, default="NONE"): cv.enum( + UART_PARITY_OPTIONS, upper=True + ), + cv.Optional(CONF_INJECTIONS, default=[]): cv.ensure_list(INJECTION_SCHEMA), + cv.Optional(CONF_RESPONSES, default=[]): cv.ensure_list(RESPONSE_SCHEMA), + cv.Optional(CONF_PERIODIC_RX, default=[]): cv.ensure_list(PERIODIC_RX_SCHEMA), + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) + + cg.add(var.set_baud_rate(config[CONF_BAUD_RATE])) + cg.add(var.set_stop_bits(config[CONF_STOP_BITS])) + cg.add(var.set_data_bits(config[CONF_DATA_BITS])) + cg.add(var.set_parity(config[CONF_PARITY])) + + for injection in config[CONF_INJECTIONS]: + rx_data = injection[CONF_INJECT_RX] + delay_ms = injection[CONF_DELAY] + cg.add(var.add_injection(rx_data, delay_ms)) + + for response in config[CONF_RESPONSES]: + tx_data = response[CONF_EXPECT_TX] + rx_data = response[CONF_INJECT_RX] + cg.add(var.add_response(tx_data, rx_data)) + + for periodic in config[CONF_PERIODIC_RX]: + data = periodic[CONF_DATA] + interval = periodic[CONF_INTERVAL] + cg.add(var.add_periodic_rx(data, interval)) diff --git a/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp b/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp new file mode 100644 index 0000000000..a49d51ba10 --- /dev/null +++ b/tests/integration/fixtures/external_components/uart_mock/uart_mock.cpp @@ -0,0 +1,159 @@ +// Host-only test component — do not copy to production code. +// See uart_mock.h for details. + +#include "uart_mock.h" +#include "esphome/core/application.h" +#include "esphome/core/helpers.h" +#include "esphome/core/log.h" + +namespace esphome::uart_mock { + +static const char *const TAG = "uart_mock"; + +void MockUartComponent::setup() { + ESP_LOGI(TAG, "Mock UART initialized with %zu injections, %zu responses, %zu periodic", this->injections_.size(), + this->responses_.size(), this->periodic_rx_.size()); +} + +void MockUartComponent::loop() { + uint32_t now = App.get_loop_component_start_time(); + + // Initialize scenario start time on first loop() call, after all components have + // finished setup(). This prevents injection delays from being consumed during setup. + if (!this->loop_started_) { + this->loop_started_ = true; + this->scenario_start_ms_ = now; + this->cumulative_delay_ms_ = 0; + ESP_LOGD(TAG, "Scenario started at %u ms", now); + } + + // Process at most ONE timed injection per loop iteration. + // This ensures each injection is in a separate loop cycle, giving the consuming + // component (e.g., LD2410) a chance to process each batch independently. + if (this->injection_index_ < this->injections_.size()) { + auto &injection = this->injections_[this->injection_index_]; + uint32_t target_time = this->scenario_start_ms_ + this->cumulative_delay_ms_ + injection.delay_ms; + if (now >= target_time) { + ESP_LOGD(TAG, "Injecting %zu RX bytes (injection %u)", injection.rx_data.size(), this->injection_index_); + this->inject_to_rx_buffer_(injection.rx_data); + this->cumulative_delay_ms_ += injection.delay_ms; + this->injection_index_++; + } + } + + // Process periodic RX + for (auto &periodic : this->periodic_rx_) { + if (now - periodic.last_inject_ms >= periodic.interval_ms) { + this->inject_to_rx_buffer_(periodic.data); + periodic.last_inject_ms = now; + } + } +} + +void MockUartComponent::dump_config() { + ESP_LOGCONFIG(TAG, + "Mock UART Component:\n" + " Baud Rate: %u\n" + " Injections: %zu\n" + " Responses: %zu\n" + " Periodic RX: %zu", + this->baud_rate_, this->injections_.size(), this->responses_.size(), this->periodic_rx_.size()); +} + +void MockUartComponent::write_array(const uint8_t *data, size_t len) { + this->tx_count_ += len; + this->tx_buffer_.insert(this->tx_buffer_.end(), data, data + len); + + // Log all TX data so tests can verify what the component sends + if (len > 0 && len <= 64) { + char hex_buf[format_hex_pretty_size(64)]; + ESP_LOGD(TAG, "TX %zu bytes: %s", len, format_hex_pretty_to(hex_buf, sizeof(hex_buf), data, len)); + } else if (len > 64) { + ESP_LOGD(TAG, "TX %zu bytes (too large to log)", len); + } + +#ifdef USE_UART_DEBUGGER + for (size_t i = 0; i < len; i++) { + this->debug_callback_.call(uart::UART_DIRECTION_TX, data[i]); + } +#endif + + this->try_match_response_(); +} + +bool MockUartComponent::peek_byte(uint8_t *data) { + if (this->rx_buffer_.empty()) { + return false; + } + *data = this->rx_buffer_.front(); + return true; +} + +bool MockUartComponent::read_array(uint8_t *data, size_t len) { + if (this->rx_buffer_.size() < len) { + return false; + } + for (size_t i = 0; i < len; i++) { + data[i] = this->rx_buffer_.front(); + this->rx_buffer_.pop_front(); + } + this->rx_count_ += len; + +#ifdef USE_UART_DEBUGGER + for (size_t i = 0; i < len; i++) { + this->debug_callback_.call(uart::UART_DIRECTION_RX, data[i]); + } +#endif + + return true; +} + +size_t MockUartComponent::available() { return this->rx_buffer_.size(); } + +void MockUartComponent::flush() { + // Nothing to flush in mock +} + +void MockUartComponent::add_injection(const std::vector &rx_data, uint32_t delay_ms) { + this->injections_.push_back({rx_data, delay_ms}); +} + +void MockUartComponent::add_response(const std::vector &expect_tx, const std::vector &inject_rx) { + this->responses_.push_back({expect_tx, inject_rx}); +} + +void MockUartComponent::add_periodic_rx(const std::vector &data, uint32_t interval_ms) { + this->periodic_rx_.push_back({data, interval_ms, 0}); +} + +void MockUartComponent::try_match_response_() { + for (auto &response : this->responses_) { + if (this->tx_buffer_.size() < response.expect_tx.size()) { + continue; + } + // Check if tx_buffer_ ends with expect_tx + size_t offset = this->tx_buffer_.size() - response.expect_tx.size(); + if (std::equal(response.expect_tx.begin(), response.expect_tx.end(), this->tx_buffer_.begin() + offset)) { + ESP_LOGD(TAG, "TX match found, injecting %zu RX bytes", response.inject_rx.size()); + this->inject_to_rx_buffer_(response.inject_rx); + this->tx_buffer_.clear(); + return; + } + } +} + +void MockUartComponent::inject_to_rx_buffer_(const std::vector &data) { + // Log injected RX data so tests can see what's being fed to the component + if (!data.empty() && data.size() <= 64) { + char hex_buf[format_hex_pretty_size(64)]; + ESP_LOGD(TAG, "RX inject %zu bytes: %s", data.size(), + format_hex_pretty_to(hex_buf, sizeof(hex_buf), data.data(), data.size())); + } else if (data.size() > 64) { + ESP_LOGD(TAG, "RX inject %zu bytes (too large to log inline)", data.size()); + } + for (uint8_t byte : data) { + this->rx_buffer_.push_back(byte); + } +} + +} // namespace esphome::uart_mock diff --git a/tests/integration/fixtures/external_components/uart_mock/uart_mock.h b/tests/integration/fixtures/external_components/uart_mock/uart_mock.h new file mode 100644 index 0000000000..00f4bf2e2b --- /dev/null +++ b/tests/integration/fixtures/external_components/uart_mock/uart_mock.h @@ -0,0 +1,78 @@ +#pragma once + +// ============================================================================ +// HOST-ONLY TEST COMPONENT — DO NOT COPY TO PRODUCTION CODE +// +// This component runs exclusively on the host platform for integration testing. +// It intentionally uses std::vector, std::deque, and dynamic allocation which +// would be inappropriate for production embedded components. Do not use this +// code as a reference for writing ESPHome components targeting real hardware. +// ============================================================================ + +#include "esphome/core/component.h" +#include "esphome/components/uart/uart_component.h" +#include +#include + +namespace esphome::uart_mock { + +class MockUartComponent : public uart::UARTComponent, public Component { + public: + void setup() override; + void loop() override; + void dump_config() override; + float get_setup_priority() const override { return setup_priority::BUS; } + + // UARTComponent interface + void write_array(const uint8_t *data, size_t len) override; + bool peek_byte(uint8_t *data) override; + bool read_array(uint8_t *data, size_t len) override; + size_t available() override; + void flush() override; + + // Scenario configuration - called from generated code + void add_injection(const std::vector &rx_data, uint32_t delay_ms); + void add_response(const std::vector &expect_tx, const std::vector &inject_rx); + void add_periodic_rx(const std::vector &data, uint32_t interval_ms); + + protected: + void check_logger_conflict() override {} + void try_match_response_(); + void inject_to_rx_buffer_(const std::vector &data); + + // Timed injections + struct Injection { + std::vector rx_data; + uint32_t delay_ms; + }; + std::vector injections_; + uint32_t injection_index_{0}; + uint32_t scenario_start_ms_{0}; + uint32_t cumulative_delay_ms_{0}; + bool loop_started_{false}; + + // TX-triggered responses + struct Response { + std::vector expect_tx; + std::vector inject_rx; + }; + std::vector responses_; + std::vector tx_buffer_; + + // RX buffer + std::deque rx_buffer_; + + // Periodic RX + struct PeriodicRx { + std::vector data; + uint32_t interval_ms; + uint32_t last_inject_ms{0}; + }; + std::vector periodic_rx_; + + // Observability + uint32_t tx_count_{0}; + uint32_t rx_count_{0}; +}; + +} // namespace esphome::uart_mock diff --git a/tests/integration/fixtures/uart_mock_ld2410.yaml b/tests/integration/fixtures/uart_mock_ld2410.yaml new file mode 100644 index 0000000000..9a81468263 --- /dev/null +++ b/tests/integration/fixtures/uart_mock_ld2410.yaml @@ -0,0 +1,145 @@ +esphome: + name: uart-mock-ld2410-test + +host: +api: +logger: + level: VERBOSE + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + +# Dummy uart entry to satisfy ld2410's DEPENDENCIES = ["uart"] +# The actual UART bus used is the uart_mock component below +uart: + baud_rate: 115200 + port: /dev/null + +uart_mock: + id: mock_uart + baud_rate: 256000 + injections: + # Phase 1 (t=100ms): Valid LD2410 normal mode data frame - happy path + # The buffer is clean at this point, so this frame should parse correctly. + # Moving target: 100cm, energy 50 + # Still target: 120cm, energy 25 + # Detection distance: 300cm + # Target state: 0x03 (moving + still) + # + # Note: LD2410's two_byte_to_int() uses signed char, so low bytes must be + # <=127 to produce correct values. Values >127 wrap negative. + # + # Frame layout (24 bytes): + # [0-3] F4 F3 F2 F1 = data frame header + # [4-5] 0D 00 = length 13 + # [6] 02 = data type (normal) + # [7] AA = data header marker + # [8] 03 = target states (moving+still) + # [9-10] 64 00 = moving distance 100 (0x0064) + # [11] 32 = moving energy 50 + # [12-13] 78 00 = still distance 120 (0x0078) + # [14] 19 = still energy 25 + # [15-16] 2C 01 = detection distance 300 (0x012C) + # [17] 00 = padding + # [18] 55 = data footer marker + # [19] 00 = CRC/check + # [20-23] F8 F7 F6 F5 = data frame footer + - delay: 100ms + inject_rx: + [ + 0xF4, 0xF3, 0xF2, 0xF1, + 0x0D, 0x00, + 0x02, 0xAA, + 0x03, + 0x64, 0x00, + 0x32, + 0x78, 0x00, + 0x19, + 0x2C, 0x01, + 0x00, + 0x55, 0x00, + 0xF8, 0xF7, 0xF6, 0xF5, + ] + + # Phase 2 (t=200ms): Garbage bytes - corrupt the buffer + # These random bytes will accumulate in the LD2410's internal buffer. + # No footer will be found, so the buffer just grows. + - delay: 100ms + inject_rx: [0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11, 0x22] + + # Phase 3 (t=300ms): Truncated frame (header + partial data, no footer) + # More bytes accumulating in the buffer without a footer match. + - delay: 100ms + inject_rx: [0xF4, 0xF3, 0xF2, 0xF1, 0x0D, 0x00, 0x02, 0xAA] + + # Phase 4 (t=500ms): Overflow - inject 85 bytes of 0xFF (MAX_LINE_LENGTH=50) + # Buffer has 15 bytes from phases 2+3. + # Overflow math: need 35 bytes to trigger first overflow (pos 15->49), + # then 50 more to trigger second overflow (pos 0->49). Total = 85 bytes. + # After two overflows, buffer_pos_ = 0 with a completely clean buffer. + # The LD2410 logs "Max command length exceeded" at each overflow. + - delay: 200ms + inject_rx: + [ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + ] + + # Phase 5 (t=600ms): Valid frame after overflow - recovery test + # Buffer was reset by overflow. This valid frame should parse correctly. + # Moving target: 50cm, energy 100 + # Still target: 75cm, energy 80 + # Detection distance: 127cm + - delay: 100ms + inject_rx: + [ + 0xF4, 0xF3, 0xF2, 0xF1, + 0x0D, 0x00, + 0x02, 0xAA, + 0x03, + 0x32, 0x00, + 0x64, + 0x4B, 0x00, + 0x50, + 0x7F, 0x00, + 0x00, + 0x55, 0x00, + 0xF8, 0xF7, 0xF6, 0xF5, + ] + +ld2410: + id: ld2410_dev + uart_id: mock_uart + +sensor: + - platform: ld2410 + ld2410_id: ld2410_dev + moving_distance: + name: "Moving Distance" + still_distance: + name: "Still Distance" + moving_energy: + name: "Moving Energy" + still_energy: + name: "Still Energy" + detection_distance: + name: "Detection Distance" + +binary_sensor: + - platform: ld2410 + ld2410_id: ld2410_dev + has_target: + name: "Has Target" + has_moving_target: + name: "Has Moving Target" + has_still_target: + name: "Has Still Target" diff --git a/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml b/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml new file mode 100644 index 0000000000..3b730fc1f8 --- /dev/null +++ b/tests/integration/fixtures/uart_mock_ld2410_engineering.yaml @@ -0,0 +1,156 @@ +esphome: + name: uart-mock-ld2410-eng-test + +host: +api: +logger: + level: VERBOSE + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + +# Dummy uart entry to satisfy ld2410's DEPENDENCIES = ["uart"] +uart: + baud_rate: 115200 + port: /dev/null + +uart_mock: + id: mock_uart + baud_rate: 256000 + injections: + # Phase 1 (t=100ms): Valid LD2410 engineering mode data frame + # Captured from a real Screek Human Presence Sensor 1U with LD2410 firmware 2.4.x + # + # Engineering mode frame layout (45 bytes): + # [0-3] F4 F3 F2 F1 = data frame header + # [4-5] 23 00 = length 35 + # [6] 01 = data type (engineering mode) + # [7] AA = data header marker + # [8] 03 = target states (moving+still) + # [9-10] 1E 00 = moving distance 30 (0x001E) + # [11] 64 = moving energy 100 + # [12-13] 1E 00 = still distance 30 (0x001E) + # [14] 64 = still energy 100 + # [15-16] 00 00 = detection distance 0 + # [17] 08 = max moving distance gate + # [18] 08 = max still distance gate + # [19-27] gate moving energies (gates 0-8) + # [28-36] gate still energies (gates 0-8) + # [37] 57 = light sensor value 87 + # [38] 01 = out pin presence (HIGH) + # [39] 55 = data footer marker + # [40] 00 = check + # [41-44] F8 F7 F6 F5 = data frame footer + - delay: 100ms + inject_rx: + [ + 0xF4, 0xF3, 0xF2, 0xF1, + 0x23, 0x00, + 0x01, 0xAA, + 0x03, + 0x1E, 0x00, + 0x64, + 0x1E, 0x00, + 0x64, + 0x00, 0x00, + 0x08, 0x08, + 0x64, 0x41, 0x06, 0x0E, 0x2B, 0x16, 0x03, 0x03, 0x07, + 0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, + 0x57, 0x01, + 0x55, 0x00, + 0xF8, 0xF7, 0xF6, 0xF5, + ] + + # Phase 2 (t=200ms): Second engineering mode frame with different values + # Real capture: moving at 73cm, still at 30cm, detection at 33cm + - delay: 100ms + inject_rx: + [ + 0xF4, 0xF3, 0xF2, 0xF1, + 0x23, 0x00, + 0x01, 0xAA, + 0x03, + 0x49, 0x00, + 0x64, + 0x1E, 0x00, + 0x64, + 0x21, 0x00, + 0x08, 0x08, + 0x11, 0x64, 0x05, 0x29, 0x39, 0x10, 0x03, 0x11, 0x0E, + 0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, + 0x57, 0x01, + 0x55, 0x00, + 0xF8, 0xF7, 0xF6, 0xF5, + ] + + # Phase 3 (t=300ms): Frame with still target at 291cm (multi-byte distance) + # This tests the two_byte_to_int function with high byte > 0 + # Note: low byte 0x2F < 0x80 so it avoids the signed char bug + - delay: 100ms + inject_rx: + [ + 0xF4, 0xF3, 0xF2, 0xF1, + 0x23, 0x00, + 0x01, 0xAA, + 0x03, + 0x2F, 0x00, + 0x36, + 0x23, 0x01, + 0x64, + 0x21, 0x00, + 0x08, 0x08, + 0x2F, 0x36, 0x09, 0x0D, 0x15, 0x0B, 0x06, 0x06, 0x08, + 0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, 0x5A, 0x3D, + 0x57, 0x01, + 0x55, 0x00, + 0xF8, 0xF7, 0xF6, 0xF5, + ] + +ld2410: + id: ld2410_dev + uart_id: mock_uart + +sensor: + - platform: ld2410 + ld2410_id: ld2410_dev + moving_distance: + name: "Moving Distance" + still_distance: + name: "Still Distance" + moving_energy: + name: "Moving Energy" + still_energy: + name: "Still Energy" + detection_distance: + name: "Detection Distance" + light: + name: "Light" + g0: + move_energy: + name: "Gate 0 Move Energy" + still_energy: + name: "Gate 0 Still Energy" + g1: + move_energy: + name: "Gate 1 Move Energy" + still_energy: + name: "Gate 1 Still Energy" + g2: + move_energy: + name: "Gate 2 Move Energy" + still_energy: + name: "Gate 2 Still Energy" + +binary_sensor: + - platform: ld2410 + ld2410_id: ld2410_dev + has_target: + name: "Has Target" + has_moving_target: + name: "Has Moving Target" + has_still_target: + name: "Has Still Target" + out_pin_presence_status: + name: "Out Pin Presence" diff --git a/tests/integration/test_uart_mock_ld2410.py b/tests/integration/test_uart_mock_ld2410.py new file mode 100644 index 0000000000..e01d6ff8e8 --- /dev/null +++ b/tests/integration/test_uart_mock_ld2410.py @@ -0,0 +1,392 @@ +"""Integration test for LD2410 component with mock UART. + +Tests: +test_uart_mock_ld2410 (normal mode): + 1. Happy path - valid data frame publishes correct sensor values + 2. Garbage resilience - random bytes don't crash the component + 3. Truncated frame handling - partial frame doesn't corrupt state + 4. Buffer overflow recovery - overflow resets the parser + 5. Post-overflow parsing - next valid frame after overflow is parsed correctly + 6. TX logging - verifies LD2410 sends expected setup commands + +test_uart_mock_ld2410_engineering (engineering mode): + 1. Engineering mode frames with per-gate energy data and light sensor + 2. Multi-byte still distance (291cm) using high byte > 0 + 3. Out pin presence binary sensor + 4. Gate energy sensor values from real device captures +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from aioesphomeapi import ( + BinarySensorInfo, + BinarySensorState, + EntityState, + SensorInfo, + SensorState, +) +import pytest + +from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_uart_mock_ld2410( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test LD2410 data parsing with happy path, garbage, overflow, and recovery.""" + # Replace external component path placeholder + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + loop = asyncio.get_running_loop() + + # Track overflow warning in logs + overflow_seen = loop.create_future() + + # Track TX data logged by the mock for assertions + tx_log_lines: list[str] = [] + + def line_callback(line: str) -> None: + if "Max command length exceeded" in line and not overflow_seen.done(): + overflow_seen.set_result(True) + # Capture all TX log lines from uart_mock + if "uart_mock" in line and "TX " in line: + tx_log_lines.append(line) + + # Track sensor state updates (after initial state is swallowed) + sensor_states: dict[str, list[float]] = { + "moving_distance": [], + "still_distance": [], + "moving_energy": [], + "still_energy": [], + "detection_distance": [], + } + binary_states: dict[str, list[bool]] = { + "has_target": [], + "has_moving_target": [], + "has_still_target": [], + } + + # Signal when we see recovery frame values + recovery_received = loop.create_future() + + def on_state(state: EntityState) -> None: + if isinstance(state, SensorState) and not state.missing_state: + sensor_name = key_to_sensor.get(state.key) + if sensor_name and sensor_name in sensor_states: + sensor_states[sensor_name].append(state.state) + # Check if this is the recovery frame (moving_distance = 50) + if ( + sensor_name == "moving_distance" + and state.state == pytest.approx(50.0) + and not recovery_received.done() + ): + recovery_received.set_result(True) + elif isinstance(state, BinarySensorState): + sensor_name = key_to_sensor.get(state.key) + if sensor_name and sensor_name in binary_states: + binary_states[sensor_name].append(state.state) + + async with ( + run_compiled(yaml_config, line_callback=line_callback), + api_client_connected() as client, + ): + entities, _ = await client.list_entities_services() + + # Build key mappings for all sensor types + all_names = list(sensor_states.keys()) + list(binary_states.keys()) + key_to_sensor = build_key_to_entity_mapping(entities, all_names) + + # Set up initial state helper + initial_state_helper = InitialStateHelper(entities) + client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) + + try: + await initial_state_helper.wait_for_initial_states() + except TimeoutError: + pytest.fail("Timeout waiting for initial states") + + # Phase 1 values are in the initial states (swallowed by InitialStateHelper). + # Verify them via initial_states dict. + moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo) + assert moving_dist_entity is not None + initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key) + assert initial_moving is not None and isinstance(initial_moving, SensorState) + assert initial_moving.state == pytest.approx(100.0), ( + f"Initial moving distance should be 100, got {initial_moving.state}" + ) + + still_dist_entity = find_entity(entities, "still_distance", SensorInfo) + assert still_dist_entity is not None + initial_still = initial_state_helper.initial_states.get(still_dist_entity.key) + assert initial_still is not None and isinstance(initial_still, SensorState) + assert initial_still.state == pytest.approx(120.0), ( + f"Initial still distance should be 120, got {initial_still.state}" + ) + + moving_energy_entity = find_entity(entities, "moving_energy", SensorInfo) + assert moving_energy_entity is not None + initial_me = initial_state_helper.initial_states.get(moving_energy_entity.key) + assert initial_me is not None and isinstance(initial_me, SensorState) + assert initial_me.state == pytest.approx(50.0), ( + f"Initial moving energy should be 50, got {initial_me.state}" + ) + + still_energy_entity = find_entity(entities, "still_energy", SensorInfo) + assert still_energy_entity is not None + initial_se = initial_state_helper.initial_states.get(still_energy_entity.key) + assert initial_se is not None and isinstance(initial_se, SensorState) + assert initial_se.state == pytest.approx(25.0), ( + f"Initial still energy should be 25, got {initial_se.state}" + ) + + detect_dist_entity = find_entity(entities, "detection_distance", SensorInfo) + assert detect_dist_entity is not None + initial_dd = initial_state_helper.initial_states.get(detect_dist_entity.key) + assert initial_dd is not None and isinstance(initial_dd, SensorState) + assert initial_dd.state == pytest.approx(300.0), ( + f"Initial detection distance should be 300, got {initial_dd.state}" + ) + + # Wait for the recovery frame (Phase 5) to be parsed + # This proves the component survived garbage + truncated + overflow + try: + await asyncio.wait_for(recovery_received, timeout=15.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for recovery frame. Received sensor states:\n" + f" moving_distance: {sensor_states['moving_distance']}\n" + f" still_distance: {sensor_states['still_distance']}\n" + f" moving_energy: {sensor_states['moving_energy']}\n" + f" still_energy: {sensor_states['still_energy']}\n" + f" detection_distance: {sensor_states['detection_distance']}" + ) + + # Verify overflow warning was logged + assert overflow_seen.done(), ( + "Expected 'Max command length exceeded' warning in logs" + ) + + # Verify LD2410 sent setup commands (TX logging) + # LD2410 sends 7 commands during setup: FF (config on), A0 (version), + # A5 (MAC), AB (distance res), AE (light), 61 (params), FE (config off) + assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock" + tx_data = " ".join(tx_log_lines) + # Verify command frame header appears (FD:FC:FB:FA) + assert "FD:FC:FB:FA" in tx_data, ( + "Expected LD2410 command frame header FD:FC:FB:FA in TX log" + ) + # Verify command frame footer appears (04:03:02:01) + assert "04:03:02:01" in tx_data, ( + "Expected LD2410 command frame footer 04:03:02:01 in TX log" + ) + + # Recovery frame values (Phase 5, after overflow) + assert len(sensor_states["moving_distance"]) >= 1, ( + f"Expected recovery moving_distance, got: {sensor_states['moving_distance']}" + ) + # Find the recovery value (moving_distance = 50) + recovery_values = [ + v for v in sensor_states["moving_distance"] if v == pytest.approx(50.0) + ] + assert len(recovery_values) >= 1, ( + f"Expected moving_distance=50 in recovery, got: {sensor_states['moving_distance']}" + ) + + # Recovery frame: moving=50, still=75, energy=100/80, detect=127 + recovery_idx = next( + i + for i, v in enumerate(sensor_states["moving_distance"]) + if v == pytest.approx(50.0) + ) + assert sensor_states["still_distance"][recovery_idx] == pytest.approx(75.0), ( + f"Recovery still distance should be 75, got {sensor_states['still_distance'][recovery_idx]}" + ) + assert sensor_states["moving_energy"][recovery_idx] == pytest.approx(100.0), ( + f"Recovery moving energy should be 100, got {sensor_states['moving_energy'][recovery_idx]}" + ) + assert sensor_states["still_energy"][recovery_idx] == pytest.approx(80.0), ( + f"Recovery still energy should be 80, got {sensor_states['still_energy'][recovery_idx]}" + ) + assert sensor_states["detection_distance"][recovery_idx] == pytest.approx( + 127.0 + ), ( + f"Recovery detection distance should be 127, got {sensor_states['detection_distance'][recovery_idx]}" + ) + + # Verify binary sensors detected targets + # Binary sensors could be in initial states or forwarded states + has_target_entity = find_entity(entities, "has_target", BinarySensorInfo) + assert has_target_entity is not None + initial_ht = initial_state_helper.initial_states.get(has_target_entity.key) + assert initial_ht is not None and isinstance(initial_ht, BinarySensorState) + assert initial_ht.state is True, "Has target should be True" + + has_moving_entity = find_entity(entities, "has_moving_target", BinarySensorInfo) + assert has_moving_entity is not None + initial_hm = initial_state_helper.initial_states.get(has_moving_entity.key) + assert initial_hm is not None and isinstance(initial_hm, BinarySensorState) + assert initial_hm.state is True, "Has moving target should be True" + + has_still_entity = find_entity(entities, "has_still_target", BinarySensorInfo) + assert has_still_entity is not None + initial_hs = initial_state_helper.initial_states.get(has_still_entity.key) + assert initial_hs is not None and isinstance(initial_hs, BinarySensorState) + assert initial_hs.state is True, "Has still target should be True" + + +@pytest.mark.asyncio +async def test_uart_mock_ld2410_engineering( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test LD2410 engineering mode with per-gate energy, light, and multi-byte distance.""" + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + loop = asyncio.get_running_loop() + + # Track sensor state updates (after initial state is swallowed) + sensor_states: dict[str, list[float]] = { + "moving_distance": [], + "still_distance": [], + "moving_energy": [], + "still_energy": [], + "detection_distance": [], + "light": [], + "gate_0_move_energy": [], + "gate_1_move_energy": [], + "gate_2_move_energy": [], + "gate_0_still_energy": [], + "gate_1_still_energy": [], + "gate_2_still_energy": [], + } + binary_states: dict[str, list[bool]] = { + "has_target": [], + "has_moving_target": [], + "has_still_target": [], + "out_pin_presence": [], + } + + # Signal when we see Phase 3 frame (still_distance = 291) + phase3_received = loop.create_future() + + def on_state(state: EntityState) -> None: + if isinstance(state, SensorState) and not state.missing_state: + sensor_name = key_to_sensor.get(state.key) + if sensor_name and sensor_name in sensor_states: + sensor_states[sensor_name].append(state.state) + if ( + sensor_name == "still_distance" + and state.state == pytest.approx(291.0) + and not phase3_received.done() + ): + phase3_received.set_result(True) + elif isinstance(state, BinarySensorState): + sensor_name = key_to_sensor.get(state.key) + if sensor_name and sensor_name in binary_states: + binary_states[sensor_name].append(state.state) + + async with ( + run_compiled(yaml_config), + api_client_connected() as client, + ): + entities, _ = await client.list_entities_services() + + all_names = list(sensor_states.keys()) + list(binary_states.keys()) + key_to_sensor = build_key_to_entity_mapping(entities, all_names) + + initial_state_helper = InitialStateHelper(entities) + client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) + + try: + await initial_state_helper.wait_for_initial_states() + except TimeoutError: + pytest.fail("Timeout waiting for initial states") + + # Phase 1 initial values (engineering mode frame): + # moving=30, energy=100, still=30, energy=100, detect=0 + moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo) + assert moving_dist_entity is not None + initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key) + assert initial_moving is not None and isinstance(initial_moving, SensorState) + assert initial_moving.state == pytest.approx(30.0), ( + f"Initial moving distance should be 30, got {initial_moving.state}" + ) + + still_dist_entity = find_entity(entities, "still_distance", SensorInfo) + assert still_dist_entity is not None + initial_still = initial_state_helper.initial_states.get(still_dist_entity.key) + assert initial_still is not None and isinstance(initial_still, SensorState) + assert initial_still.state == pytest.approx(30.0), ( + f"Initial still distance should be 30, got {initial_still.state}" + ) + + # Verify engineering mode sensors from initial state + # Gate 0 moving energy = 0x64 = 100 + gate0_move_entity = find_entity(entities, "gate_0_move_energy", SensorInfo) + assert gate0_move_entity is not None + initial_g0m = initial_state_helper.initial_states.get(gate0_move_entity.key) + assert initial_g0m is not None and isinstance(initial_g0m, SensorState) + assert initial_g0m.state == pytest.approx(100.0), ( + f"Gate 0 move energy should be 100, got {initial_g0m.state}" + ) + + # Gate 1 moving energy = 0x41 = 65 + gate1_move_entity = find_entity(entities, "gate_1_move_energy", SensorInfo) + assert gate1_move_entity is not None + initial_g1m = initial_state_helper.initial_states.get(gate1_move_entity.key) + assert initial_g1m is not None and isinstance(initial_g1m, SensorState) + assert initial_g1m.state == pytest.approx(65.0), ( + f"Gate 1 move energy should be 65, got {initial_g1m.state}" + ) + + # Light sensor = 0x57 = 87 + light_entity = find_entity(entities, "light", SensorInfo) + assert light_entity is not None + initial_light = initial_state_helper.initial_states.get(light_entity.key) + assert initial_light is not None and isinstance(initial_light, SensorState) + assert initial_light.state == pytest.approx(87.0), ( + f"Light sensor should be 87, got {initial_light.state}" + ) + + # Out pin presence = 0x01 = True + out_pin_entity = find_entity(entities, "out_pin_presence", BinarySensorInfo) + assert out_pin_entity is not None + initial_out = initial_state_helper.initial_states.get(out_pin_entity.key) + assert initial_out is not None and isinstance(initial_out, BinarySensorState) + assert initial_out.state is True, "Out pin presence should be True" + + # Wait for Phase 3 frame (still_distance = 291cm, multi-byte) + try: + await asyncio.wait_for(phase3_received, timeout=15.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for Phase 3 frame. Received sensor states:\n" + f" still_distance: {sensor_states['still_distance']}\n" + f" moving_distance: {sensor_states['moving_distance']}" + ) + + # Phase 3: still distance = 0x0123 = 291cm (multi-byte distance test) + phase3_still = [ + v for v in sensor_states["still_distance"] if v == pytest.approx(291.0) + ] + assert len(phase3_still) >= 1, ( + f"Expected still_distance=291, got: {sensor_states['still_distance']}" + )