[modbus] Fix timing bugs and better adhere to spec (#8032)

Co-authored-by: brambo123 <52667932+brambo123@users.noreply.github.com>
Co-authored-by: Keith Burzinski <kbx81x@gmail.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick+github@koston.org>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
Bonne Eggleston
2026-03-05 12:54:17 -08:00
committed by GitHub
parent d11e7cab46
commit b0be02e16d
14 changed files with 396 additions and 121 deletions

View File

@@ -26,7 +26,7 @@ void GrowattSolar::update() {
}
// The bus might be slow, or there might be other devices, or other components might be talking to our device.
if (this->waiting_for_response()) {
if (!this->ready_for_immediate_send()) {
this->waiting_to_update_ = true;
return;
}

View File

@@ -20,6 +20,7 @@ MULTI_CONF = True
CONF_ROLE = "role"
CONF_MODBUS_ID = "modbus_id"
CONF_SEND_WAIT_TIME = "send_wait_time"
CONF_TURNAROUND_TIME = "turnaround_time"
ModbusRole = modbus_ns.enum("ModbusRole")
MODBUS_ROLES = {
@@ -36,6 +37,9 @@ CONFIG_SCHEMA = (
cv.Optional(
CONF_SEND_WAIT_TIME, default="250ms"
): cv.positive_time_period_milliseconds,
cv.Optional(
CONF_TURNAROUND_TIME, default="100ms"
): cv.positive_time_period_milliseconds,
cv.Optional(CONF_DISABLE_CRC, default=False): cv.boolean,
}
)
@@ -57,6 +61,7 @@ async def to_code(config):
cg.add(var.set_flow_control_pin(pin))
cg.add(var.set_send_wait_time(config[CONF_SEND_WAIT_TIME]))
cg.add(var.set_turnaround_time(config[CONF_TURNAROUND_TIME]))
cg.add(var.set_disable_crc(config[CONF_DISABLE_CRC]))

View File

@@ -15,10 +15,69 @@ void Modbus::setup() {
if (this->flow_control_pin_ != nullptr) {
this->flow_control_pin_->setup();
}
}
void Modbus::loop() {
const uint32_t now = App.get_loop_component_start_time();
this->frame_delay_ms_ =
std::max(2, // 1750us minimum per spec - rounded up to 2ms.
// 3.5 characters * 11 bits per character * 1000ms/sec / (bits/sec) (Standard modbus frame delay)
(uint16_t) (3.5 * 11 * 1000 / this->parent_->get_baud_rate()) + 1);
this->long_rx_buffer_delay_ms_ =
(this->parent_->get_rx_full_threshold() * 11 * 1000 / this->parent_->get_baud_rate()) + 1;
}
void Modbus::loop() {
// First process all available incoming data.
this->receive_and_parse_modbus_bytes_();
// If the response frame is finished (including interframe delay) - we timeout.
// The long_rx_buffer_delay accounts for long responses (larger than the UART rx_full_threshold) to avoid timeouts
// when the buffer is filling the back half of the response
const uint16_t timeout = std::max(
(uint16_t) this->frame_delay_ms_,
(uint16_t) (this->rx_buffer_.size() >= this->parent_->get_rx_full_threshold() ? this->long_rx_buffer_delay_ms_
: 0));
// We use millis() here and elsewhere instead of App.get_loop_component_start_time() to avoid stale timestamps
// It's critical in all timestamp comparisons that the left timestamp comes before the right one in time
// If we use a cached value in place of millis() and last_modbus_byte_ is updated inside our loop
// then the comparison is backwards (small negative which wraps to large positive) and will cause a false timeout
// So in this component we don't use any cached timestamp values to avoid these annoying bugs
if (millis() - this->last_modbus_byte_ > timeout) {
this->clear_rx_buffer_(LOG_STR("timeout after partial response"), true);
}
// If we're past the send_wait_time timeout and response buffer doesn't have the start of the expected response
if (this->waiting_for_response_ != 0 &&
millis() - this->last_send_ > this->last_send_tx_offset_ + this->send_wait_time_ &&
(this->rx_buffer_.empty() || this->rx_buffer_[0] != this->waiting_for_response_)) {
ESP_LOGW(TAG, "Stop waiting for response from %" PRIu8 " %" PRIu32 "ms after last send",
this->waiting_for_response_, millis() - this->last_send_);
this->waiting_for_response_ = 0;
}
// If there's no response pending and there's commands in the buffer
this->send_next_frame_();
}
bool Modbus::tx_blocked() {
const uint32_t now = millis();
// We block transmission in any of these case:
// 1. There are bytes in the UART Rx buffer
// 2. There are bytes in our Rx buffer
// 3. We're waiting for a response
// 4. The last sent byte isn't more than frame_delay ms ago (i.e. wait to tell receivers that our previous Tx is done)
// 5. The last received byte isn't more than frame_delay ms ago (i.e. wait to be sure there isn't more Rx coming)
// 6. If we're a client - also wait for the turnaround delay, to give the servers time to process the previous message
return this->available() || !this->rx_buffer_.empty() || (this->waiting_for_response_ != 0) ||
(now - this->last_send_ < this->last_send_tx_offset_ + this->frame_delay_ms_ +
(this->role == ModbusRole::CLIENT ? this->turnaround_delay_ms_ : 0)) ||
(now - this->last_modbus_byte_ <
this->frame_delay_ms_ + (this->role == ModbusRole::CLIENT ? this->turnaround_delay_ms_ : 0));
}
bool Modbus::tx_buffer_empty() { return this->tx_buffer_.empty(); }
void Modbus::receive_and_parse_modbus_bytes_() {
// Read all available bytes in batches to reduce UART call overhead.
size_t avail = this->available();
uint8_t buf[64];
@@ -28,33 +87,20 @@ void Modbus::loop() {
break;
}
avail -= to_read;
for (size_t i = 0; i < to_read; i++) {
if (this->parse_modbus_byte_(buf[i])) {
this->last_modbus_byte_ = now;
if (this->rx_buffer_.empty()) {
ESP_LOGV(TAG, "Received first byte %" PRIu8 " (0X%x) %" PRIu32 "ms after last send", buf[i], buf[i],
millis() - this->last_send_);
} else {
size_t at = this->rx_buffer_.size();
if (at > 0) {
ESP_LOGV(TAG, "Clearing buffer of %d bytes - parse failed", at);
this->rx_buffer_.clear();
}
ESP_LOGVV(TAG, "Received byte %" PRIu8 " (0X%x) %" PRIu32 "ms after last send", buf[i], buf[i],
millis() - this->last_send_);
}
}
}
if (now - this->last_modbus_byte_ > 50) {
size_t at = this->rx_buffer_.size();
if (at > 0) {
ESP_LOGV(TAG, "Clearing buffer of %d bytes - timeout", at);
this->rx_buffer_.clear();
}
// stop blocking new send commands after sent_wait_time_ ms after response received
if (now - this->last_send_ > send_wait_time_) {
if (waiting_for_response > 0) {
ESP_LOGV(TAG, "Stop waiting for response from %d", waiting_for_response);
// If the bytes in the rx buffer do not parse, clear out the buffer
if (!this->parse_modbus_byte_(buf[i])) {
this->clear_rx_buffer_(LOG_STR("parse failed"), true);
}
waiting_for_response = 0;
this->last_modbus_byte_ = millis();
}
}
}
@@ -63,7 +109,7 @@ bool Modbus::parse_modbus_byte_(uint8_t byte) {
size_t at = this->rx_buffer_.size();
this->rx_buffer_.push_back(byte);
const uint8_t *raw = &this->rx_buffer_[0];
ESP_LOGVV(TAG, "Modbus received Byte %d (0X%x)", byte, byte);
// Byte 0: modbus address (match all)
if (at == 0)
return true;
@@ -101,7 +147,7 @@ bool Modbus::parse_modbus_byte_(uint8_t byte) {
if (computed_crc != remote_crc)
return true;
ESP_LOGD(TAG, "Modbus user-defined function %02X found", function_code);
ESP_LOGD(TAG, "User-defined function %02X found", function_code);
} else {
// data starts at 2 and length is 4 for read registers commands
@@ -152,9 +198,19 @@ bool Modbus::parse_modbus_byte_(uint8_t byte) {
uint16_t remote_crc = uint16_t(raw[data_offset + data_len]) | (uint16_t(raw[data_offset + data_len + 1]) << 8);
if (computed_crc != remote_crc) {
if (this->disable_crc_) {
ESP_LOGD(TAG, "Modbus CRC Check failed, but ignored! %02X!=%02X", computed_crc, remote_crc);
ESP_LOGD(TAG, "CRC check failed %" PRIu32 "ms after last send; ignoring", millis() - this->last_send_);
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERY_VERBOSE
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
#endif
ESP_LOGVV(TAG, " (%02X != %02X) %s", computed_crc, remote_crc,
format_hex_pretty_to(hex_buf, this->rx_buffer_.data(), this->rx_buffer_.size()));
} else {
ESP_LOGW(TAG, "Modbus CRC Check failed! %02X!=%02X", computed_crc, remote_crc);
ESP_LOGW(TAG, "CRC check failed %" PRIu32 "ms after last send", millis() - this->last_send_);
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERY_VERBOSE
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
#endif
ESP_LOGVV(TAG, " (%02X != %02X) %s", computed_crc, remote_crc,
format_hex_pretty_to(hex_buf, this->rx_buffer_.data(), this->rx_buffer_.size()));
return false;
}
}
@@ -164,52 +220,101 @@ bool Modbus::parse_modbus_byte_(uint8_t byte) {
for (auto *device : this->devices_) {
if (device->address_ == address) {
found = true;
// Is it an error response?
if ((function_code & FUNCTION_CODE_EXCEPTION_MASK) == FUNCTION_CODE_EXCEPTION_MASK) {
ESP_LOGD(TAG, "Modbus error function code: 0x%X exception: %d", function_code, raw[2]);
if (waiting_for_response != 0) {
device->on_modbus_error(function_code & FUNCTION_CODE_MASK, raw[2]);
} else {
// Ignore modbus exception not related to a pending command
ESP_LOGD(TAG, "Ignoring Modbus error - not expecting a response");
}
continue;
}
if (this->role == ModbusRole::SERVER) {
if (function_code == ModbusFunctionCode::READ_HOLDING_REGISTERS ||
function_code == ModbusFunctionCode::READ_INPUT_REGISTERS) {
device->on_modbus_read_registers(function_code, uint16_t(data[1]) | (uint16_t(data[0]) << 8),
uint16_t(data[3]) | (uint16_t(data[2]) << 8));
continue;
}
if (function_code == ModbusFunctionCode::WRITE_SINGLE_REGISTER ||
function_code == ModbusFunctionCode::WRITE_MULTIPLE_REGISTERS) {
} else if (function_code == ModbusFunctionCode::WRITE_SINGLE_REGISTER ||
function_code == ModbusFunctionCode::WRITE_MULTIPLE_REGISTERS) {
device->on_modbus_write_registers(function_code, data);
continue;
}
} else { // We're a client
// Is it an error response?
if ((function_code & FUNCTION_CODE_EXCEPTION_MASK) == FUNCTION_CODE_EXCEPTION_MASK) {
uint8_t exception = raw[2];
ESP_LOGW(TAG,
"Error function code: 0x%X exception: %" PRIu8 ", address: %" PRIu8 ", %" PRIu32
"ms after last send",
function_code, exception, address, millis() - this->last_send_);
if (this->waiting_for_response_ == address) {
device->on_modbus_error(function_code & FUNCTION_CODE_MASK, exception);
} else {
// Ignore modbus exception not related to a pending command
ESP_LOGD(TAG, "Ignoring error - not expecting a response from %" PRIu8 "", address);
}
} else { // Not an error response
if (this->waiting_for_response_ == address) {
device->on_modbus_data(data);
} else {
// Ignore modbus response not related to a pending command
ESP_LOGW(TAG, "Ignoring response - not expecting a response from %" PRIu8 ", %" PRIu32 "ms after last send",
address, millis() - this->last_send_);
}
}
}
// fallthrough for other function codes
device->on_modbus_data(data);
}
}
waiting_for_response = 0;
if (!found) {
ESP_LOGW(TAG, "Got Modbus frame from unknown address 0x%02X! ", address);
if (!found && this->role == ModbusRole::CLIENT) {
ESP_LOGW(TAG, "Got frame from unknown address %" PRIu8 ", %" PRIu32 "ms after last send", address,
millis() - this->last_send_);
}
// reset buffer
ESP_LOGV(TAG, "Clearing buffer of %d bytes - parse succeeded", at);
this->rx_buffer_.clear();
this->clear_rx_buffer_(LOG_STR("parse succeeded"));
if (this->waiting_for_response_ == address)
this->waiting_for_response_ = 0;
return true;
}
void Modbus::send_next_frame_() {
if (this->tx_buffer_.empty())
return;
if (this->tx_blocked())
return;
const ModbusDeviceCommand &frame = this->tx_buffer_.front();
if (this->role == ModbusRole::CLIENT) {
this->waiting_for_response_ = frame.data.get()[0];
}
if (this->flow_control_pin_ != nullptr) {
this->flow_control_pin_->digital_write(true);
this->write_array(frame.data.get(), frame.size);
this->flush();
this->flow_control_pin_->digital_write(false);
this->last_send_tx_offset_ = 0;
} else {
this->write_array(frame.data.get(), frame.size);
this->last_send_tx_offset_ = frame.size * 11 * 1000 / this->parent_->get_baud_rate() + 1;
}
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
#endif
ESP_LOGV(TAG, "Write: %s %" PRIu32 "ms after last send", format_hex_pretty_to(hex_buf, frame.data.get(), frame.size),
millis() - this->last_send_);
this->last_send_ = millis();
this->tx_buffer_.pop_front();
if (!this->tx_buffer_.empty()) {
ESP_LOGV(TAG, "Write queue contains %" PRIu32 " items.", this->tx_buffer_.size());
}
}
void Modbus::dump_config() {
ESP_LOGCONFIG(TAG,
"Modbus:\n"
" Send Wait Time: %d ms\n"
" Turnaround Time: %d ms\n"
" Frame Delay: %d ms\n"
" Long Rx Buffer Delay: %d ms\n"
" CRC Disabled: %s",
this->send_wait_time_, YESNO(this->disable_crc_));
this->send_wait_time_, this->turnaround_delay_ms_, this->frame_delay_ms_,
this->long_rx_buffer_delay_ms_, YESNO(this->disable_crc_));
LOG_PIN(" Flow Control Pin: ", this->flow_control_pin_);
}
float Modbus::get_setup_priority() const {
@@ -228,15 +333,6 @@ void Modbus::send(uint8_t address, uint8_t function_code, uint16_t start_address
return;
}
static constexpr size_t ADDR_SIZE = 1;
static constexpr size_t FC_SIZE = 1;
static constexpr size_t START_ADDR_SIZE = 2;
static constexpr size_t NUM_ENTITIES_SIZE = 2;
static constexpr size_t BYTE_COUNT_SIZE = 1;
static constexpr size_t MAX_PAYLOAD_SIZE = std::numeric_limits<uint8_t>::max();
static constexpr size_t CRC_SIZE = 2;
static constexpr size_t MAX_FRAME_SIZE =
ADDR_SIZE + FC_SIZE + START_ADDR_SIZE + NUM_ENTITIES_SIZE + BYTE_COUNT_SIZE + MAX_PAYLOAD_SIZE + CRC_SIZE;
uint8_t data[MAX_FRAME_SIZE];
size_t pos = 0;
@@ -259,29 +355,16 @@ void Modbus::send(uint8_t address, uint8_t function_code, uint16_t start_address
} else {
payload_len = 2; // Write single register or coil
}
if (payload_len + pos + 2 > MAX_FRAME_SIZE) { // Check if payload fits (accounting for CRC)
ESP_LOGE(TAG, "Payload too large to send: %d bytes", payload_len);
return;
}
for (int i = 0; i < payload_len; i++) {
data[pos++] = payload[i];
}
}
auto crc = crc16(data, pos);
data[pos++] = crc >> 0;
data[pos++] = crc >> 8;
if (this->flow_control_pin_ != nullptr)
this->flow_control_pin_->digital_write(true);
this->write_array(data, pos);
this->flush();
if (this->flow_control_pin_ != nullptr)
this->flow_control_pin_->digital_write(false);
waiting_for_response = address;
last_send_ = millis();
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
#endif
ESP_LOGV(TAG, "Modbus write: %s", format_hex_pretty_to(hex_buf, data, pos));
this->queue_raw_(data, pos);
}
// Helper function for lambdas
@@ -290,23 +373,44 @@ void Modbus::send_raw(const std::vector<uint8_t> &payload) {
if (payload.empty()) {
return;
}
// Frame size: payload + CRC(2)
if (payload.size() + 2 > MAX_FRAME_SIZE) {
ESP_LOGE(TAG, "Attempted to send frame larger than max frame size of %d bytes", MAX_FRAME_SIZE);
return;
}
// Use stack buffer - Modbus frames are small and bounded
uint8_t data[MAX_FRAME_SIZE];
if (this->flow_control_pin_ != nullptr)
this->flow_control_pin_->digital_write(true);
std::memcpy(data, payload.data(), payload.size());
auto crc = crc16(payload.data(), payload.size());
this->write_array(payload);
this->write_byte(crc & 0xFF);
this->write_byte((crc >> 8) & 0xFF);
this->flush();
if (this->flow_control_pin_ != nullptr)
this->flow_control_pin_->digital_write(false);
waiting_for_response = payload[0];
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
this->queue_raw_(data, payload.size());
}
// Assume data and length is valid and append CRC, then queue for sending. Used internally to avoid unnecessary copying
// of data into vectors
void Modbus::queue_raw_(const uint8_t *data, uint16_t len) {
if (this->tx_buffer_.size() < MODBUS_TX_BUFFER_SIZE) {
this->tx_buffer_.emplace_back(data, len);
} else {
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_ERROR
char hex_buf[format_hex_pretty_size(MODBUS_MAX_LOG_BYTES)];
#endif
ESP_LOGV(TAG, "Modbus write raw: %s", format_hex_pretty_to(hex_buf, payload.data(), payload.size()));
last_send_ = millis();
ESP_LOGE(TAG, "Write buffer full, dropped: %s", format_hex_pretty_to(hex_buf, data, len));
}
}
void Modbus::clear_rx_buffer_(const LogString *reason, bool warn) {
size_t at = this->rx_buffer_.size();
if (at > 0) {
if (warn) {
ESP_LOGW(TAG, "Clearing buffer of %" PRIu32 " bytes - %s %" PRIu32 "ms after last send", at, LOG_STR_ARG(reason),
millis() - this->last_send_);
} else {
ESP_LOGV(TAG, "Clearing buffer of %" PRIu32 " bytes - %s %" PRIu32 "ms after last send", at, LOG_STR_ARG(reason),
millis() - this->last_send_);
}
this->rx_buffer_.clear();
}
}
} // namespace modbus

View File

@@ -5,11 +5,16 @@
#include "esphome/components/modbus/modbus_definitions.h"
#include <cstring>
#include <memory>
#include <vector>
#include <queue>
namespace esphome {
namespace modbus {
static constexpr uint16_t MODBUS_TX_BUFFER_SIZE = 15;
enum ModbusRole {
CLIENT,
SERVER,
@@ -17,6 +22,19 @@ enum ModbusRole {
class ModbusDevice;
struct ModbusDeviceCommand {
// Frame with exact-size allocation to avoid std::vector overhead
std::unique_ptr<uint8_t[]> data;
uint16_t size; // Modbus RTU max is 256 bytes
ModbusDeviceCommand(const uint8_t *src, uint16_t len) : data(std::make_unique<uint8_t[]>(len + 2)), size(len + 2) {
std::memcpy(this->data.get(), src, len);
auto crc = crc16(data.get(), len);
data[len + 0] = crc >> 0;
data[len + 1] = crc >> 8;
}
};
class Modbus : public uart::UARTDevice, public Component {
public:
Modbus() = default;
@@ -30,28 +48,45 @@ class Modbus : public uart::UARTDevice, public Component {
void register_device(ModbusDevice *device) { this->devices_.push_back(device); }
float get_setup_priority() const override;
bool tx_buffer_empty();
bool tx_blocked();
void send(uint8_t address, uint8_t function_code, uint16_t start_address, uint16_t number_of_entities,
uint8_t payload_len = 0, const uint8_t *payload = nullptr);
void send_raw(const std::vector<uint8_t> &payload);
void set_role(ModbusRole role) { this->role = role; }
void set_flow_control_pin(GPIOPin *flow_control_pin) { this->flow_control_pin_ = flow_control_pin; }
uint8_t waiting_for_response{0};
void set_send_wait_time(uint16_t time_in_ms) { send_wait_time_ = time_in_ms; }
void set_disable_crc(bool disable_crc) { disable_crc_ = disable_crc; }
void set_send_wait_time(uint16_t time_in_ms) { this->send_wait_time_ = time_in_ms; }
void set_turnaround_time(uint16_t time_in_ms) { this->turnaround_delay_ms_ = time_in_ms; }
void set_disable_crc(bool disable_crc) { this->disable_crc_ = disable_crc; }
ModbusRole role;
protected:
GPIOPin *flow_control_pin_{nullptr};
bool parse_modbus_byte_(uint8_t byte);
uint16_t send_wait_time_{250};
bool disable_crc_;
std::vector<uint8_t> rx_buffer_;
void receive_and_parse_modbus_bytes_();
void clear_rx_buffer_(const LogString *reason, bool warn = false);
void send_next_frame_();
void queue_raw_(const uint8_t *data, uint16_t len);
uint32_t last_modbus_byte_{0};
uint32_t last_send_{0};
uint32_t last_send_tx_offset_{0};
uint16_t frame_delay_ms_{5};
uint16_t long_rx_buffer_delay_ms_{0};
uint16_t send_wait_time_{250};
uint16_t turnaround_delay_ms_{100};
uint8_t waiting_for_response_{0};
bool disable_crc_{false};
GPIOPin *flow_control_pin_{nullptr};
std::vector<uint8_t> rx_buffer_;
std::vector<ModbusDevice *> devices_;
// std::deque is appropriate here since we need a FIFO buffer, and we can't know ahead of time how many
// requests will be queued. Each modbus component may queue multiple requests, and the sequence of scheduling
// may change at run time.
std::deque<ModbusDeviceCommand> tx_buffer_;
};
class ModbusDevice {
@@ -76,7 +111,9 @@ class ModbusDevice {
this->send_raw(error_response);
}
// If more than one device is connected block sending a new command before a response is received
bool waiting_for_response() { return parent_->waiting_for_response != 0; }
ESPDEPRECATED("Use ready_for_immediate_send() instead. Removed in 2026.9.0", "2026.3.0")
bool waiting_for_response() { return !ready_for_immediate_send(); }
bool ready_for_immediate_send() { return parent_->tx_buffer_empty() && !parent_->tx_blocked(); }
protected:
friend Modbus;

View File

@@ -81,6 +81,8 @@ const uint8_t MAX_NUM_OF_REGISTERS_TO_WRITE = 123; // 0x7B
// 6.3 03 (0x03) Read Holding Registers
// 6.4 04 (0x04) Read Input Registers
const uint8_t MAX_NUM_OF_REGISTERS_TO_READ = 125; // 0x7D
static constexpr uint16_t MAX_FRAME_SIZE = 256;
/// End of Modbus definitions
} // namespace modbus
} // namespace esphome

View File

@@ -48,6 +48,7 @@ CONF_SERVER_REGISTERS = "server_registers"
MULTI_CONF = True
modbus_controller_ns = cg.esphome_ns.namespace("modbus_controller")
modbus_ns = cg.esphome_ns.namespace("modbus")
ModbusController = modbus_controller_ns.class_(
"ModbusController", cg.PollingComponent, modbus.ModbusDevice
)
@@ -56,7 +57,7 @@ SensorItem = modbus_controller_ns.struct("SensorItem")
ServerCourtesyResponse = modbus_controller_ns.struct("ServerCourtesyResponse")
ServerRegister = modbus_controller_ns.struct("ServerRegister")
ModbusFunctionCode_ns = modbus_controller_ns.namespace("ModbusFunctionCode")
ModbusFunctionCode_ns = modbus_ns.namespace("ModbusFunctionCode")
ModbusFunctionCode = ModbusFunctionCode_ns.enum("ModbusFunctionCode")
MODBUS_FUNCTION_CODE = {
"read_coils": ModbusFunctionCode.READ_COILS,

View File

@@ -18,7 +18,7 @@ void ModbusController::setup() { this->create_register_ranges_(); }
bool ModbusController::send_next_command_() {
uint32_t last_send = millis() - this->last_command_timestamp_;
if ((last_send > this->command_throttle_) && !waiting_for_response() && !this->command_queue_.empty()) {
if ((last_send > this->command_throttle_) && this->ready_for_immediate_send() && !this->command_queue_.empty()) {
auto &command = this->command_queue_.front();
// remove from queue if command was sent too often

View File

@@ -1,3 +1,5 @@
modbus:
id: mod_bus1
flow_control_pin: ${flow_control_pin}
send_wait_time: 500ms
turnaround_time: 100ms

View File

@@ -71,6 +71,7 @@ RESPONSE_SCHEMA = cv.Schema(
{
cv.Required(CONF_EXPECT_TX): [cv.hex_uint8_t],
cv.Required(CONF_INJECT_RX): [cv.hex_uint8_t],
cv.Optional(CONF_DELAY, default="0ms"): cv.positive_time_period_milliseconds,
}
)
@@ -151,7 +152,8 @@ async def to_code(config):
for response in config[CONF_RESPONSES]:
tx_data = response[CONF_EXPECT_TX]
rx_data = response[CONF_INJECT_RX]
cg.add(var.add_response(tx_data, rx_data))
delay_ms = response[CONF_DELAY]
cg.add(var.add_response(tx_data, rx_data, delay_ms))
for periodic in config[CONF_PERIODIC_RX]:
data = periodic[CONF_DATA]

View File

@@ -36,8 +36,8 @@ void MockUartComponent::loop() {
// component (e.g., LD2410) a chance to process each batch independently.
if (this->injection_index_ < this->injections_.size()) {
auto &injection = this->injections_[this->injection_index_];
uint32_t target_time = this->scenario_start_ms_ + this->cumulative_delay_ms_ + injection.delay_ms;
if (now >= target_time) {
uint32_t total_delay = this->cumulative_delay_ms_ + injection.delay_ms;
if (now - this->scenario_start_ms_ >= total_delay) {
ESP_LOGD(TAG, "Injecting %zu RX bytes (injection %u)", injection.rx_data.size(), this->injection_index_);
this->inject_to_rx_buffer(injection.rx_data);
this->cumulative_delay_ms_ += injection.delay_ms;
@@ -52,6 +52,15 @@ void MockUartComponent::loop() {
periodic.last_inject_ms = now;
}
}
// Process delayed responses
for (auto &response : this->responses_) {
if (response.delay_ms > 0 && response.last_match_ms > 0 && now - response.last_match_ms >= response.delay_ms) {
ESP_LOGD(TAG, "Injecting %zu RX bytes for delayed response", response.inject_rx.size());
this->inject_to_rx_buffer(response.inject_rx);
response.last_match_ms = 0; // Reset to prevent repeated injection
}
}
}
void MockUartComponent::start_scenario() {
@@ -149,8 +158,9 @@ void MockUartComponent::add_injection(const std::vector<uint8_t> &rx_data, uint3
this->injections_.push_back({rx_data, delay_ms});
}
void MockUartComponent::add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx) {
this->responses_.push_back({expect_tx, inject_rx});
void MockUartComponent::add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx,
uint32_t delay_ms) {
this->responses_.push_back({expect_tx, inject_rx, delay_ms, 0});
}
void MockUartComponent::add_periodic_rx(const std::vector<uint8_t> &data, uint32_t interval_ms) {
@@ -166,7 +176,13 @@ void MockUartComponent::try_match_response_() {
size_t offset = this->tx_buffer_.size() - response.expect_tx.size();
if (std::equal(response.expect_tx.begin(), response.expect_tx.end(), this->tx_buffer_.begin() + offset)) {
ESP_LOGD(TAG, "TX match found, injecting %zu RX bytes", response.inject_rx.size());
this->inject_to_rx_buffer(response.inject_rx);
if (response.delay_ms > 0) {
ESP_LOGD(TAG, "Delaying response by %u ms", response.delay_ms);
// Schedule the response injection as a future injection
response.last_match_ms = App.get_loop_component_start_time();
} else {
this->inject_to_rx_buffer(response.inject_rx);
}
this->tx_buffer_.clear();
return;
}

View File

@@ -34,7 +34,8 @@ class MockUartComponent : public uart::UARTComponent, public Component {
// Scenario configuration - called from generated code
void add_injection(const std::vector<uint8_t> &rx_data, uint32_t delay_ms);
void add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx);
void add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx,
uint32_t delay_ms = 0);
void add_periodic_rx(const std::vector<uint8_t> &data, uint32_t interval_ms);
void start_scenario();
@@ -64,6 +65,8 @@ class MockUartComponent : public uart::UARTComponent, public Component {
struct Response {
std::vector<uint8_t> expect_tx;
std::vector<uint8_t> inject_rx;
uint32_t delay_ms;
uint32_t last_match_ms{0};
};
std::vector<Response> responses_;
std::vector<uint8_t> tx_buffer_;

View File

@@ -25,20 +25,64 @@ uart_mock:
auto_start: false
debug:
responses:
- expect_tx: [0x01, 0x03, 0x00, 0x03, 0x00, 0x01, 0x74, 0x0A] # Read holding register 1 on device 1
- expect_tx: [0x01, 0x03, 0x00, 0x03, 0x00, 0x01, 0x74, 0x0A] # Read holding register 3 on device 1 (basic_register)
inject_rx: [0x01, 0x03, 0x02, 0x01, 0x03, 0xF9, 0xD5] # Return value 0x0103 (hex) = 259 (dec)
- expect_tx: [0x01, 0x03, 0x00, 0x05, 0x00, 0x01, 0x94, 0x0B] # Read holding register 5 on device 1 (delayed_response)
delay: 100ms # Shorter than modbus send_wait_time of 200ms, should succeed
inject_rx: [0x01, 0x03, 0x02, 0x00, 0xFF, 0xF8, 0x04] # Return value 0x00FF (hex) = 255 (dec)
- expect_tx: [0x02, 0x03, 0x00, 0x07, 0x00, 0x01, 0x35, 0xF8] # Read holding register 7 on device 2 (late_response)
delay: 300ms # Longer than modbus send_wait_time of 200ms, should cause timeout
inject_rx: [0x02, 0x03, 0x02, 0x00, 0xF0, 0xFC, 0x00] # Return value 0x00F0 (hex) = 240 (dec)
- expect_tx: [0x03, 0x03, 0x00, 0x09, 0x00, 0x01, 0x55, 0xEA] # Read holding register 9 on device 3 (no_response)
inject_rx: [] # No response, should cause timeout
- expect_tx: [0x01, 0x03, 0x00, 0x0A, 0x00, 0x01, 0xA4, 0x08] # Read holding register A on device 1 (exception_response)
inject_rx: [0x01, 0x83, 0x02, 0xC0, 0xF1] # Exception response with code 2 (illegal data address)
modbus:
uart_id: virtual_uart_dev
send_wait_time: 200ms
turnaround_time: 10ms
modbus_controller:
address: 1
- address: 1
id: modbus_controller_ok
max_cmd_retries: 0
update_interval: 1s
- address: 2
id: modbus_controller_slow
max_cmd_retries: 0
update_interval: 1s
- address: 3
id: modbus_controller_offline
max_cmd_retries: 0
update_interval: 1s
sensor:
- platform: modbus_controller
name: "basic_register"
address: 0x03
register_type: holding
modbus_controller_id: modbus_controller_ok
- platform: modbus_controller
name: "delayed_response"
address: 0x05
register_type: holding
modbus_controller_id: modbus_controller_ok
- platform: modbus_controller
name: "late_response"
address: 0x07
register_type: holding
modbus_controller_id: modbus_controller_slow
- platform: modbus_controller
name: "no_response"
address: 0x09
register_type: holding
modbus_controller_id: modbus_controller_offline
- platform: modbus_controller
name: "exception_response"
address: 0x0A
register_type: holding
modbus_controller_id: modbus_controller_ok
button:
- platform: template

View File

@@ -46,10 +46,12 @@ uart_mock:
modbus:
uart_id: virtual_uart_dev
turnaround_time: 10ms
sensor:
- platform: sdm_meter
address: 2
update_interval: 1s
phase_a:
voltage:
name: sdm_voltage

View File

@@ -39,9 +39,17 @@ async def test_uart_mock_modbus(
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"basic_register": [],
"delayed_response": [],
"late_response": [],
"no_response": [],
"exception_response": [],
}
basic_register_changed = loop.create_future()
delayed_response_changed = loop.create_future()
late_response_changed = loop.create_future()
no_response_changed = loop.create_future()
exception_response_changed = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
@@ -54,6 +62,23 @@ async def test_uart_mock_modbus(
and not basic_register_changed.done()
):
basic_register_changed.set_result(True)
elif (
sensor_name == "delayed_response"
and state.state == 255.0
and not delayed_response_changed.done()
):
delayed_response_changed.set_result(True)
elif (
sensor_name == "late_response" and not late_response_changed.done()
):
late_response_changed.set_result(True)
elif sensor_name == "no_response" and not no_response_changed.done():
no_response_changed.set_result(True)
elif (
sensor_name == "exception_response"
and not exception_response_changed.done()
):
exception_response_changed.set_result(True)
async with (
run_compiled(yaml_config),
@@ -79,20 +104,52 @@ async def test_uart_mock_modbus(
assert start_btn is not None, "Start Scenario button not found"
client.button_command(start_btn.key)
try:
await asyncio.wait_for(delayed_response_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for delayed_response change. Received sensor states:\n"
f" delayed_response: {sensor_states['delayed_response']}\n"
)
try:
await asyncio.wait_for(late_response_changed, timeout=2.0)
pytest.fail(
f"late_response change should not have been triggered, but was. Received sensor states:\n"
f" late_response: {sensor_states['late_response']}\n"
)
except TimeoutError:
pass # Expected timeout since we never inject a response for late_response
try:
await asyncio.wait_for(no_response_changed, timeout=2.0)
pytest.fail(
f"no_response change should not have been triggered, but was. Received sensor states:\n"
f" no_response: {sensor_states['no_response']}\n"
)
except TimeoutError:
pass # Expected timeout since we never inject a response for no_response
# Wait for basic register to be updated with successful parse
try:
await asyncio.wait_for(basic_register_changed, timeout=15.0)
await asyncio.wait_for(basic_register_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Basic Register change. Received sensor states:\n"
f" basic_register: {sensor_states['basic_register']}\n"
)
try:
await asyncio.wait_for(exception_response_changed, timeout=2.0)
pytest.fail(
f"exception_response change should not have been triggered, but was. Received sensor states:\n"
f" exception_response: {sensor_states['exception_response']}\n"
)
except TimeoutError:
pass
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="There is a bug in UART which will timeout for long responses."
)
async def test_uart_mock_modbus_timing(
yaml_config: str,
run_compiled: RunCompiledFunction,
@@ -155,7 +212,7 @@ async def test_uart_mock_modbus_timing(
# Wait for voltage to be updated with successful parse
try:
await asyncio.wait_for(voltage_changed, timeout=15.0)
await asyncio.wait_for(voltage_changed, timeout=2.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for SDM voltage change. Received sensor states:\n"