[modbus] Add integration tests for server and server via controller (#14845)

Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
Bonne Eggleston
2026-04-03 13:24:02 -07:00
committed by GitHub
parent f8f65c1a7b
commit c6bb1fe141
10 changed files with 1136 additions and 229 deletions

View File

@@ -198,6 +198,13 @@ async def yaml_config(request: pytest.FixtureRequest, unused_tcp_port: int) -> s
' - "-g" # Add debug symbols',
)
# Replace external component path placeholder if present
if "EXTERNAL_COMPONENT_PATH" in content:
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
content = content.replace("EXTERNAL_COMPONENT_PATH", external_components_path)
return content

View File

@@ -22,6 +22,8 @@ uart_mock:
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
# auto_start must be false to avoid races: the test presses the
# "Start Scenario" button only after subscribing to states.
auto_start: false
debug:
responses:
@@ -46,7 +48,7 @@ modbus:
modbus_controller:
- address: 1
id: modbus_controller_ok
max_cmd_retries: 0
max_cmd_retries: 2
update_interval: 1s
- address: 2
id: modbus_controller_slow
@@ -89,4 +91,4 @@ button:
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(virtual_uart_dev).start_scenario();'
- lambda: "id(virtual_uart_dev).start_scenario();"

View File

@@ -22,6 +22,8 @@ uart:
uart_mock:
- id: virtual_uart_dev
baud_rate: 9600
# auto_start must be false to avoid races: the test presses the
# "Start Scenario" button only after subscribing to states.
auto_start: false
debug:
on_tx:
@@ -40,7 +42,8 @@ uart_mock:
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00};
- uart_mock.inject_rx: # Second USB packet: rest of response (staged with 40ms latency)
delay: 40ms
data: !lambda return{0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
data:
!lambda return{0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x42,0x6F,0xCC,0xCD,0x43,0x7C,0xB8,0x10,0x3D,0x38,0x51,0xEC,
0x43,0x81,0x1B,0xE7,0x3B,0x03,0x12,0x6F,0x50,0x1B};
@@ -61,4 +64,4 @@ button:
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(virtual_uart_dev).start_scenario();'
- lambda: "id(virtual_uart_dev).start_scenario();"

View File

@@ -0,0 +1,124 @@
esphome:
name: uart-mock-modbus-server-test
host:
api:
logger:
level: VERBOSE
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
# Dummy uart entry to satisfy modbus's DEPENDENCIES = ["uart"]
# The actual UART bus used is the uart_mock component below
uart:
baud_rate: 115200
port: /dev/null
uart_mock:
- id: virtual_uart_dev
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
auto_start: false
debug:
injections:
- delay: 100ms
inject_rx: [0x01, 0x03, 0x00, 0x03, 0x00, 0x01, 0x74, 0x0A] # Read holding register 3 on device 1 (basic_read)
- delay: 100ms
# Read holding register 7 on device 2
# Reply from device 2
# Read holding register 5 on device 1 (read_after_peer_response)
inject_rx:
[
0x02,
0x03,
0x00,
0x07,
0x00,
0x01,
0x35,
0xF8,
0x02,
0x03,
0x02,
0x00,
0xF0,
0xFC,
0x00,
0x01,
0x03,
0x00,
0x05,
0x00,
0x01,
0x94,
0x0B,
]
- delay: 100ms
inject_rx: [0x02, 0x03, 0x00, 0x07, 0x00, 0x01, 0x35, 0xF8] # Read holding register 7 on device 2, with no response
- delay: 100ms
# Read holding register 7 on device 2, with no response
# Read holding register A on device 1 (read_after_peer_timeout)
inject_rx:
[
0x02,
0x03,
0x00,
0x07,
0x00,
0x01,
0x35,
0xF8,
0x01,
0x03,
0x00,
0x0A,
0x00,
0x01,
0xA4,
0x08,
]
modbus:
uart_id: virtual_uart_dev
role: server
modbus_controller:
- address: 1
server_registers:
- address: 0x03
value_type: U_WORD
read_lambda: |-
id(basic_read).publish_state(1);
return 1;
- address: 0x05
value_type: U_WORD
read_lambda: |-
id(read_after_peer_response).publish_state(1);
return 1;
- address: 0x0A
value_type: U_WORD
read_lambda: |-
id(read_after_peer_timeout).publish_state(1);
return 1;
sensor:
- platform: template
name: "basic_read"
id: basic_read
- platform: template
name: "read_after_peer_response"
id: read_after_peer_response
- platform: template
name: "read_after_peer_timeout"
id: read_after_peer_timeout
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: "id(virtual_uart_dev).start_scenario();"

View File

@@ -0,0 +1,180 @@
esphome:
name: uart-mock-modbus-server-contro
host:
api:
logger:
level: VERBOSE
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
# Dummy uart entry to satisfy modbus's DEPENDENCIES = ["uart"]
# The actual UART bus used is the uart_mock component below
uart:
baud_rate: 115200
port: /dev/null
uart_mock:
- id: virtual_uart_server
baud_rate: 9600
# auto_start must be true for loopback fixtures: the modbus controller
# polls on its update_interval immediately at boot, so the uart_mock
# forwarding must already be active or early requests are lost and
# generate modbus warnings.
auto_start: true
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_controller
data: !lambda return data;
- id: virtual_uart_controller
baud_rate: 9600
auto_start: true # See comment on virtual_uart_server above
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_server
data: !lambda return data;
modbus:
- uart_id: virtual_uart_server
id: virtual_modbus_server
role: server
- uart_id: virtual_uart_controller
id: virtual_modbus_controller
role: client
turnaround_time: 10ms
modbus_controller:
- address: 1
modbus_id: virtual_modbus_controller
update_interval: 1s
id: modbus_controller_1
- address: 1
modbus_id: virtual_modbus_server
id: modbus_server_1
server_registers:
- address: 0x01
value_type: U_WORD
read_lambda: return 99;
- address: 0x03
value_type: S_WORD
read_lambda: return -99;
- address: 0x05
value_type: U_DWORD
read_lambda: return 16909060;
- address: 0x08
value_type: S_DWORD
read_lambda: return -16909060;
- address: 0x0B
value_type: U_DWORD_R
read_lambda: return 67305985;
- address: 0x0E
value_type: S_DWORD_R
read_lambda: return -67305985;
- address: 0x11
value_type: U_QWORD
read_lambda: return 72623859790382856;
- address: 0x16
value_type: S_QWORD
read_lambda: return -72623859790382856;
- address: 0x1B
value_type: U_QWORD_R
read_lambda: return 578437695752307201;
- address: 0x20
value_type: S_QWORD_R
read_lambda: return -578437695752307201;
- address: 0x25
value_type: FP32
read_lambda: return 3.14;
- address: 0x28
value_type: FP32_R
read_lambda: return 3.14;
sensor:
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_word"
address: 0x01
register_type: holding
value_type: U_WORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_word"
address: 0x03
register_type: holding
value_type: S_WORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_dword"
address: 0x05
register_type: holding
value_type: U_DWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_dword"
address: 0x08
register_type: holding
value_type: S_DWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_dword_r"
address: 0x0B
register_type: holding
value_type: U_DWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_dword_r"
address: 0x0E
register_type: holding
value_type: S_DWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_qword"
address: 0x11
register_type: holding
value_type: U_QWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_qword"
address: 0x16
register_type: holding
value_type: S_QWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_qword_r"
address: 0x1B
register_type: holding
value_type: U_QWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_qword_r"
address: 0x20
register_type: holding
value_type: S_QWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_fp32"
address: 0x25
register_type: holding
value_type: FP32
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_fp32_r"
address: 0x28
register_type: holding
value_type: FP32_R
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: "id(virtual_uart_server).start_scenario();"
- lambda: "id(virtual_uart_controller).start_scenario();"

View File

@@ -0,0 +1,118 @@
esphome:
name: uart-mock-modbus-server-mult
host:
api:
logger:
level: VERBOSE
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
# Dummy uart entry to satisfy modbus's DEPENDENCIES = ["uart"]
# The actual UART bus used is the uart_mock component below
uart:
baud_rate: 115200
port: /dev/null
uart_mock:
- id: virtual_uart_server
baud_rate: 9600
# auto_start must be true for loopback fixtures: the modbus controller
# polls on its update_interval immediately at boot, so the uart_mock
# forwarding must already be active or early requests are lost and
# generate modbus warnings.
auto_start: true
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_controller
data: !lambda return data;
- uart_mock.inject_rx:
id: virtual_uart_server_2
data: !lambda return data;
- id: virtual_uart_server_2
baud_rate: 9600
auto_start: true # See comment on virtual_uart_server above
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_server
data: !lambda return data;
- uart_mock.inject_rx:
id: virtual_uart_controller
data: !lambda return data;
- id: virtual_uart_controller
baud_rate: 9600
auto_start: true # See comment on virtual_uart_server above
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_server
data: !lambda return data;
- uart_mock.inject_rx:
id: virtual_uart_server_2
data: !lambda return data;
modbus:
- uart_id: virtual_uart_server
id: virtual_modbus_server
role: server
- uart_id: virtual_uart_server_2
id: virtual_modbus_server_2
role: server
- uart_id: virtual_uart_controller
id: virtual_modbus_client
role: client
turnaround_time: 10ms
modbus_controller:
- address: 1
modbus_id: virtual_modbus_client
update_interval: 1s
id: modbus_controller_1
- address: 2
modbus_id: virtual_modbus_client
update_interval: 1s
id: modbus_controller_2
- address: 1
modbus_id: virtual_modbus_server
server_registers:
- address: 0x01
value_type: U_WORD
read_lambda: return 919;
- address: 2
modbus_id: virtual_modbus_server_2
server_registers:
- address: 0x01
value_type: U_WORD
read_lambda: return 929;
sensor:
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_word"
address: 0x01
register_type: holding
value_type: U_WORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_2
name: "reg_u_word_2"
address: 0x01
register_type: holding
value_type: U_WORD
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: "id(virtual_uart_server).start_scenario();"
- lambda: "id(virtual_uart_server_2).start_scenario();"
- lambda: "id(virtual_uart_controller).start_scenario();"

View File

@@ -0,0 +1,330 @@
esphome:
name: uart-mock-modbus-srv-write
host:
api:
logger:
level: VERBOSE
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
# Dummy uart entry to satisfy modbus's DEPENDENCIES = ["uart"]
# The actual UART bus used is the uart_mock component below
uart:
baud_rate: 115200
port: /dev/null
uart_mock:
- id: virtual_uart_server
baud_rate: 9600
# auto_start must be true for loopback fixtures: the modbus controller
# polls on its update_interval immediately at boot, so the uart_mock
# forwarding must already be active or early requests are lost and
# generate modbus warnings.
auto_start: true
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_controller
data: !lambda return data;
- id: virtual_uart_controller
baud_rate: 9600
auto_start: true # See comment on virtual_uart_server above
debug:
on_tx:
- then:
- uart_mock.inject_rx:
id: virtual_uart_server
data: !lambda return data;
globals:
- id: stored_u_word
type: uint16_t
initial_value: "11"
- id: stored_s_word
type: int16_t
initial_value: "-11"
- id: stored_u_dword
type: uint32_t
initial_value: "1001"
- id: stored_s_dword
type: int32_t
initial_value: "-1001"
- id: stored_u_dword_r
type: uint32_t
initial_value: "3003"
- id: stored_s_dword_r
type: int32_t
initial_value: "-3003"
- id: stored_u_qword
type: uint64_t
initial_value: "5005"
- id: stored_s_qword
type: int64_t
initial_value: "-5005"
- id: stored_u_qword_r
type: uint64_t
initial_value: "7007"
- id: stored_s_qword_r
type: int64_t
initial_value: "-7007"
- id: stored_fp32
type: float
initial_value: "1.5"
- id: stored_fp32_r
type: float
initial_value: "2.5"
modbus:
- uart_id: virtual_uart_server
id: virtual_modbus_server
role: server
- uart_id: virtual_uart_controller
id: virtual_modbus_controller
role: client
turnaround_time: 10ms
modbus_controller:
- address: 1
modbus_id: virtual_modbus_controller
update_interval: 2s
id: modbus_controller_1
- address: 1
modbus_id: virtual_modbus_server
id: modbus_server_1
server_registers:
- address: 0x01
value_type: U_WORD
read_lambda: return id(stored_u_word);
write_lambda: id(stored_u_word) = x; return true;
- address: 0x03
value_type: S_WORD
read_lambda: return id(stored_s_word);
write_lambda: id(stored_s_word) = x; return true;
- address: 0x05
value_type: U_DWORD
read_lambda: return id(stored_u_dword);
write_lambda: id(stored_u_dword) = x; return true;
- address: 0x08
value_type: S_DWORD
read_lambda: return id(stored_s_dword);
write_lambda: id(stored_s_dword) = x; return true;
- address: 0x0B
value_type: U_DWORD_R
read_lambda: return id(stored_u_dword_r);
write_lambda: id(stored_u_dword_r) = x; return true;
- address: 0x0E
value_type: S_DWORD_R
read_lambda: return id(stored_s_dword_r);
write_lambda: id(stored_s_dword_r) = x; return true;
- address: 0x11
value_type: U_QWORD
read_lambda: return id(stored_u_qword);
write_lambda: id(stored_u_qword) = x; return true;
- address: 0x16
value_type: S_QWORD
read_lambda: return id(stored_s_qword);
write_lambda: id(stored_s_qword) = x; return true;
- address: 0x1B
value_type: U_QWORD_R
read_lambda: return id(stored_u_qword_r);
write_lambda: id(stored_u_qword_r) = x; return true;
- address: 0x20
value_type: S_QWORD_R
read_lambda: return id(stored_s_qword_r);
write_lambda: id(stored_s_qword_r) = x; return true;
- address: 0x25
value_type: FP32
read_lambda: return id(stored_fp32);
write_lambda: id(stored_fp32) = x; return true;
- address: 0x28
value_type: FP32_R
read_lambda: return id(stored_fp32_r);
write_lambda: id(stored_fp32_r) = x; return true;
sensor:
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_word"
address: 0x01
register_type: holding
value_type: U_WORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_word"
address: 0x03
register_type: holding
value_type: S_WORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_dword"
address: 0x05
register_type: holding
value_type: U_DWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_dword"
address: 0x08
register_type: holding
value_type: S_DWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_dword_r"
address: 0x0B
register_type: holding
value_type: U_DWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_dword_r"
address: 0x0E
register_type: holding
value_type: S_DWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_qword"
address: 0x11
register_type: holding
value_type: U_QWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_qword"
address: 0x16
register_type: holding
value_type: S_QWORD
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_u_qword_r"
address: 0x1B
register_type: holding
value_type: U_QWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_s_qword_r"
address: 0x20
register_type: holding
value_type: S_QWORD_R
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_fp32"
address: 0x25
register_type: holding
value_type: FP32
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "reg_fp32_r"
address: 0x28
register_type: holding
value_type: FP32_R
number:
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_u_word"
address: 0x01
register_type: holding
value_type: U_WORD
min_value: 0
max_value: 65535
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_s_word"
address: 0x03
register_type: holding
value_type: S_WORD
min_value: -16777215
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_u_dword"
address: 0x05
register_type: holding
value_type: U_DWORD
min_value: 0
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_s_dword"
address: 0x08
register_type: holding
value_type: S_DWORD
min_value: -16777215
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_u_dword_r"
address: 0x0B
register_type: holding
value_type: U_DWORD_R
min_value: 0
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_s_dword_r"
address: 0x0E
register_type: holding
value_type: S_DWORD_R
min_value: -16777215
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_u_qword"
address: 0x11
register_type: holding
value_type: U_QWORD
min_value: 0
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_s_qword"
address: 0x16
register_type: holding
value_type: S_QWORD
min_value: -16777215
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_u_qword_r"
address: 0x1B
register_type: holding
value_type: U_QWORD_R
min_value: 0
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_s_qword_r"
address: 0x20
register_type: holding
value_type: S_QWORD_R
min_value: -16777215
max_value: 16777215
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_fp32"
address: 0x25
register_type: holding
value_type: FP32
min_value: -16777215
max_value: 16777215
step: 0.01
- platform: modbus_controller
modbus_controller_id: modbus_controller_1
name: "write_fp32_r"
address: 0x28
register_type: holding
value_type: FP32_R
min_value: -16777215
max_value: 16777215
step: 0.01
button:
- platform: template
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: "id(virtual_uart_server).start_scenario();"
- lambda: "id(virtual_uart_controller).start_scenario();"

View File

@@ -22,6 +22,8 @@ uart_mock:
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
# auto_start must be false to avoid races: the test presses the
# "Start Scenario" button only after subscribing to states.
auto_start: false
debug:
on_tx:
@@ -61,4 +63,4 @@ button:
name: "Start Scenario"
id: start_scenario_btn
on_press:
- lambda: 'id(virtual_uart_dev).start_scenario();'
- lambda: "id(virtual_uart_dev).start_scenario();"

View File

@@ -346,3 +346,109 @@ class SensorStateCollector:
else:
self._waiters.append((condition, future))
return future
class SensorTracker:
"""Data-driven sensor state tracker with expected-value futures.
Tracks sensor state updates and resolves futures when sensors report
specific expected values. Eliminates per-sensor future boilerplate.
Usage::
tracker = SensorTracker(["reg_u_word", "reg_s_word"])
futures = tracker.expect_all({"reg_u_word": 99, "reg_s_word": -99})
# ... subscribe_states with tracker.on_state, start scenario ...
await tracker.await_all(futures)
"""
def __init__(self, sensor_names: list[str]) -> None:
self.sensor_states: dict[str, list[float]] = {name: [] for name in sensor_names}
self.key_to_sensor: dict[int, str] = {}
self._expectations: dict[str, list[tuple[object, asyncio.Future]]] = {}
_ANY = object() # Sentinel: match any value
def expect(self, name: str, value: object) -> asyncio.Future:
"""Register an expected value for *name* and return a future for it."""
future: asyncio.Future = asyncio.get_running_loop().create_future()
self._expectations.setdefault(name, []).append((value, future))
return future
def expect_any(self, name: str) -> asyncio.Future:
"""Register a future that resolves on *any* state update for *name*."""
return self.expect(name, self._ANY)
def expect_all(self, expected: dict[str, object]) -> dict[str, asyncio.Future]:
"""Call ``expect`` for every entry and return a dict of futures."""
return {name: self.expect(name, value) for name, value in expected.items()}
def on_state(self, state: EntityState) -> None:
"""State callback suitable for ``subscribe_states``."""
if not isinstance(state, SensorState) or state.missing_state:
return
sensor_name = self.key_to_sensor.get(state.key)
if not sensor_name or sensor_name not in self.sensor_states:
return
self.sensor_states[sensor_name].append(state.state)
for expected_value, future in self._expectations.get(sensor_name, []):
if not future.done() and (
expected_value is self._ANY or state.state == expected_value
):
future.set_result(True)
break
async def await_change(
self, future: asyncio.Future, name: str, timeout: float = 2.0
) -> None:
"""Wait for a sensor future to resolve; fail the test on timeout."""
try:
await asyncio.wait_for(future, timeout=timeout)
except TimeoutError:
import pytest
pytest.fail(
f"Timeout waiting for {name} change. Received sensor states:\n"
f" {name}: {self.sensor_states[name]}\n"
)
async def await_must_not_change(
self, future: asyncio.Future, name: str, timeout: float = 2.0
) -> None:
"""Assert a sensor future does NOT resolve within the timeout."""
try:
await asyncio.wait_for(future, timeout=timeout)
except TimeoutError:
return # Expected
import pytest
pytest.fail(
f"{name} change should not have been triggered, but was. "
f"Received sensor states:\n {name}: {self.sensor_states[name]}\n"
)
async def await_all(
self, futures: dict[str, asyncio.Future], timeout: float = 2.0
) -> None:
"""Await every future in *futures*, failing with per-sensor diagnostics."""
for name, future in futures.items():
await self.await_change(future, name, timeout=timeout)
async def setup_and_start_scenario(self, client) -> list:
"""Wire up subscriptions, wait for initial states, press Start Scenario."""
entities, _ = await client.list_entities_services()
self.key_to_sensor.update(
build_key_to_entity_mapping(entities, list(self.sensor_states.keys()))
)
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(self.on_state))
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
import pytest
pytest.fail("Timeout waiting for initial states")
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
return entities

View File

@@ -14,15 +14,67 @@ test_uart_mock_modbus_no_threshold :
from __future__ import annotations
import asyncio
from pathlib import Path
from collections.abc import Callable
from dataclasses import dataclass
from aioesphomeapi import ButtonInfo, EntityState, SensorState
from aioesphomeapi import NumberInfo
import pytest
from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity
from .state_utils import SensorTracker, find_entity
from .types import APIClientConnectedFactory, RunCompiledFunction
@dataclass
class RegisterTestCase:
"""Test parameters for a single modbus register write/read round-trip."""
initial_value: object
write_number_name: str
write_value: float
post_write_value: object
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_modbus_line_callback() -> tuple[Callable[[str], None], list[str], list[str]]:
"""Return a (callback, error_lines, warning_lines) tuple for tracking modbus log output.
Only captures bus-level modbus messages ([modbus:]), not modbus_controller
scheduling noise (e.g. "Duplicate modbus command found").
"""
error_log_lines: list[str] = []
warning_log_lines: list[str] = []
def line_callback(line: str) -> None:
if "[E][modbus:" in line:
error_log_lines.append(line)
if "[W][modbus:" in line:
warning_log_lines.append(line)
return line_callback, error_log_lines, warning_log_lines
def _assert_no_modbus_errors(
error_log_lines: list[str], warning_log_lines: list[str]
) -> None:
assert len(error_log_lines) == 0, (
"Expect no errors logged by the modbus mock, but got:\n"
+ "\n".join(error_log_lines)
)
assert len(warning_log_lines) == 0, (
"Expect no warnings logged by the modbus mock, but got:\n"
+ "\n".join(warning_log_lines)
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_uart_mock_modbus(
yaml_config: str,
@@ -30,127 +82,41 @@ async def test_uart_mock_modbus(
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test basic modbus data parsing."""
# Replace external component path placeholder
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
tracker = SensorTracker(
[
"basic_register",
"delayed_response",
"late_response",
"no_response",
"exception_response",
]
)
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"basic_register": [],
"delayed_response": [],
"late_response": [],
"no_response": [],
"exception_response": [],
}
basic_register_changed = loop.create_future()
delayed_response_changed = loop.create_future()
late_response_changed = loop.create_future()
no_response_changed = loop.create_future()
exception_response_changed = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
if (
sensor_name == "basic_register"
and state.state == 259.0
and not basic_register_changed.done()
):
basic_register_changed.set_result(True)
elif (
sensor_name == "delayed_response"
and state.state == 255.0
and not delayed_response_changed.done()
):
delayed_response_changed.set_result(True)
elif (
sensor_name == "late_response" and not late_response_changed.done()
):
late_response_changed.set_result(True)
elif sensor_name == "no_response" and not no_response_changed.done():
no_response_changed.set_result(True)
elif (
sensor_name == "exception_response"
and not exception_response_changed.done()
):
exception_response_changed.set_result(True)
basic_register_changed = tracker.expect("basic_register", 259.0)
delayed_response_changed = tracker.expect("delayed_response", 255.0)
# late_response / no_response / exception_response: expect *any* value
# (these should never fire, so we use a permissive match via expect_any)
late_response_changed = tracker.expect_any("late_response")
no_response_changed = tracker.expect_any("no_response")
exception_response_changed = tracker.expect_any("exception_response")
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
await tracker.setup_and_start_scenario(client)
# Build key mappings for all sensor types
all_names = list(sensor_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
try:
await asyncio.wait_for(delayed_response_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for delayed_response change. Received sensor states:\n"
f" delayed_response: {sensor_states['delayed_response']}\n"
)
try:
await asyncio.wait_for(late_response_changed, timeout=2.0)
pytest.fail(
f"late_response change should not have been triggered, but was. Received sensor states:\n"
f" late_response: {sensor_states['late_response']}\n"
)
except TimeoutError:
pass # Expected timeout since we never inject a response for late_response
try:
await asyncio.wait_for(no_response_changed, timeout=2.0)
pytest.fail(
f"no_response change should not have been triggered, but was. Received sensor states:\n"
f" no_response: {sensor_states['no_response']}\n"
)
except TimeoutError:
pass # Expected timeout since we never inject a response for no_response
# Wait for basic register to be updated with successful parse
try:
await asyncio.wait_for(basic_register_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Basic Register change. Received sensor states:\n"
f" basic_register: {sensor_states['basic_register']}\n"
)
try:
await asyncio.wait_for(exception_response_changed, timeout=2.0)
pytest.fail(
f"exception_response change should not have been triggered, but was. Received sensor states:\n"
f" exception_response: {sensor_states['exception_response']}\n"
)
except TimeoutError:
pass
await tracker.await_change(delayed_response_changed, "delayed_response")
await tracker.await_change(basic_register_changed, "basic_register")
# Run all "must not change" checks concurrently — each waits the full
# timeout, so sequential execution would multiply the wall time.
await asyncio.gather(
tracker.await_must_not_change(late_response_changed, "late_response"),
tracker.await_must_not_change(no_response_changed, "no_response"),
tracker.await_must_not_change(
exception_response_changed, "exception_response"
),
)
@pytest.mark.asyncio
@@ -159,69 +125,17 @@ async def test_uart_mock_modbus_timing(
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test basic modbus data parsing."""
# Replace external component path placeholder
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
"""Test modbus timing with multi-register SDM meter response."""
loop = asyncio.get_running_loop()
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"sdm_voltage": [],
}
voltage_changed = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
# Check if this is a good voltage reading (243V)
if (
sensor_name == "sdm_voltage"
and state.state > 200.0
and not voltage_changed.done()
):
voltage_changed.set_result(True)
tracker = SensorTracker(["sdm_voltage"])
voltage_changed = tracker.expect_any("sdm_voltage")
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
# Build key mappings for all sensor types
all_names = list(sensor_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
# Wait for voltage to be updated with successful parse
try:
await asyncio.wait_for(voltage_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for SDM voltage change. Received sensor states:\n"
f" sdm_voltage: {sensor_states['sdm_voltage']}\n"
)
await tracker.setup_and_start_scenario(client)
await tracker.await_change(voltage_changed, "sdm_voltage")
@pytest.mark.asyncio
@@ -234,66 +148,187 @@ async def test_uart_mock_modbus_no_threshold(
Without the 50ms fallback timeout, the chunked response with a 40ms gap
between USB packets would cause a false timeout and CRC failure cascade.
Bus-level warnings (CRC failures, buffer clears) are expected during
chunked reassembly — the test only verifies the final value arrives.
"""
# Replace external component path placeholder
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"sdm_voltage": [],
}
voltage_changed = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
# Check if this is a good voltage reading (243V)
if (
sensor_name == "sdm_voltage"
and state.state > 200.0
and not voltage_changed.done()
):
voltage_changed.set_result(True)
tracker = SensorTracker(["sdm_voltage"])
voltage_changed = tracker.expect_any("sdm_voltage")
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
await tracker.setup_and_start_scenario(client)
await tracker.await_change(voltage_changed, "sdm_voltage")
# Build key mappings for all sensor types
all_names = list(sensor_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="Modbus parser cannot handle server responses from other devices on the bus. Fix tracked in PR #11969.",
strict=True,
)
async def test_uart_mock_modbus_server(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test modbus server parsing with peer traffic on a shared bus."""
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
line_callback, error_log_lines, warning_log_lines = _make_modbus_line_callback()
# Start the UART mock scenario now that we're subscribed
start_btn = find_entity(entities, "start_scenario", ButtonInfo)
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
tracker = SensorTracker(
["basic_read", "read_after_peer_response", "read_after_peer_timeout"]
)
futures = tracker.expect_all(
{
"basic_read": 1,
"read_after_peer_response": 1,
"read_after_peer_timeout": 1,
}
)
# Wait for voltage to be updated with successful parse
try:
await asyncio.wait_for(voltage_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for SDM voltage change. Received sensor states:\n"
f" sdm_voltage: {sensor_states['sdm_voltage']}\n"
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
await tracker.setup_and_start_scenario(client)
await tracker.await_all(futures)
_assert_no_modbus_errors(error_log_lines, warning_log_lines)
@pytest.mark.asyncio
async def test_uart_mock_modbus_server_controller(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test server/controller functionality for all read register types."""
line_callback, error_log_lines, warning_log_lines = _make_modbus_line_callback()
expected_values = {
"reg_u_word": 99,
"reg_s_word": -99,
"reg_u_dword": 16909060,
"reg_s_dword": -16909060,
"reg_u_dword_r": pytest.approx(67305985),
"reg_s_dword_r": pytest.approx(-67305985),
"reg_u_qword": pytest.approx(72623859790382856),
"reg_s_qword": pytest.approx(-72623859790382856),
"reg_u_qword_r": pytest.approx(578437695752307201),
"reg_s_qword_r": pytest.approx(-578437695752307201),
"reg_fp32": pytest.approx(3.14),
"reg_fp32_r": pytest.approx(3.14),
}
tracker = SensorTracker(list(expected_values.keys()))
futures = tracker.expect_all(expected_values)
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
await tracker.setup_and_start_scenario(client)
await tracker.await_all(futures)
_assert_no_modbus_errors(error_log_lines, warning_log_lines)
@pytest.mark.asyncio
async def test_uart_mock_modbus_server_controller_write(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test server/controller write functionality for all register value types.
Verifies that writing to modbus server registers via the controller updates
the server's stored values, which are then read back correctly on the next poll.
All 12 value types are tested: U/S_WORD, U/S_DWORD(_R), U/S_QWORD(_R), FP32(_R).
"""
line_callback, error_log_lines, warning_log_lines = _make_modbus_line_callback()
register_test_cases: dict[str, RegisterTestCase] = {
"reg_u_word": RegisterTestCase(11, "write_u_word", 42, 42),
"reg_s_word": RegisterTestCase(-11, "write_s_word", -42, -42),
"reg_u_dword": RegisterTestCase(1001, "write_u_dword", 2002, 2002),
"reg_s_dword": RegisterTestCase(-1001, "write_s_dword", -2002, -2002),
"reg_u_dword_r": RegisterTestCase(3003, "write_u_dword_r", 4004, 4004),
"reg_s_dword_r": RegisterTestCase(-3003, "write_s_dword_r", -4004, -4004),
"reg_u_qword": RegisterTestCase(5005, "write_u_qword", 6006, 6006),
"reg_s_qword": RegisterTestCase(-5005, "write_s_qword", -6006, -6006),
"reg_u_qword_r": RegisterTestCase(7007, "write_u_qword_r", 8008, 8008),
"reg_s_qword_r": RegisterTestCase(-7007, "write_s_qword_r", -8008, -8008),
"reg_fp32": RegisterTestCase(
pytest.approx(1.5, abs=0.01),
"write_fp32",
3.14,
pytest.approx(3.14, abs=0.01),
),
"reg_fp32_r": RegisterTestCase(
pytest.approx(2.5, abs=0.01),
"write_fp32_r",
6.28,
pytest.approx(6.28, abs=0.01),
),
}
tracker = SensorTracker(list(register_test_cases.keys()))
# Phase 1: expect initial baseline values
initial_futures = tracker.expect_all(
{name: case.initial_value for name, case in register_test_cases.items()}
)
# Phase 2: expect post-write values (registered now so on_state can match them)
written_futures = tracker.expect_all(
{name: case.post_write_value for name, case in register_test_cases.items()}
)
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
entities = await tracker.setup_and_start_scenario(client)
# Wait for initial baseline values to confirm the controller <-> server
# connection is working before issuing writes
await tracker.await_all(initial_futures, timeout=4.0)
# Issue write commands for all register types
for case in register_test_cases.values():
entity = find_entity(entities, case.write_number_name, NumberInfo)
assert entity is not None, (
f"{case.write_number_name} number entity not found"
)
client.number_command(entity.key, case.write_value)
# Wait for sensors to reflect the written values (round-trip write+read)
await tracker.await_all(written_futures, timeout=4.0)
_assert_no_modbus_errors(error_log_lines, warning_log_lines)
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="Modbus parser cannot handle server responses from other devices on the bus. Fix tracked in PR #11969.",
strict=True,
)
async def test_uart_mock_modbus_server_controller_multiple(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test server/controller functionality with multiple servers."""
line_callback, error_log_lines, warning_log_lines = _make_modbus_line_callback()
expected_values = {"reg_u_word": 919, "reg_u_word_2": 929}
tracker = SensorTracker(list(expected_values.keys()))
futures = tracker.expect_all(expected_values)
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
await tracker.setup_and_start_scenario(client)
await tracker.await_all(futures)
_assert_no_modbus_errors(error_log_lines, warning_log_lines)