Merge pull request #14904 from esphome/bump-2026.3.0b4

2026.3.0b4
This commit is contained in:
Jesse Hills
2026-03-18 16:21:28 +13:00
committed by GitHub
23 changed files with 313 additions and 119 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2026.3.0b3
PROJECT_NUMBER = 2026.3.0b4
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -442,8 +442,12 @@ class ProtoMessage {
virtual const char *message_name() const { return "unknown"; }
#endif
#ifndef USE_HOST
protected:
#endif
// Non-virtual destructor is protected to prevent polymorphic deletion.
// On host platform, made public to allow value-initialization of std::array
// members (e.g. DeviceInfoResponse::devices) without clang errors.
~ProtoMessage() = default;
};

View File

@@ -186,8 +186,8 @@ async def to_code_base(config):
cg.add_library("SPI", None)
cg.add_library(
"BME68x Sensor library",
"1.3.40408",
"https://github.com/boschsensortec/Bosch-BME68x-Library",
None,
"https://github.com/boschsensortec/Bosch-BME68x-Library#v1.3.40408",
)
cg.add_library(
"BSEC2 Software Library",

View File

@@ -575,8 +575,9 @@ template<typename... Args> void enqueue_ble_event(Args... args) {
load_ble_event(event, args...);
// Push the event to the queue
// Push always succeeds: pool is sized to queue capacity (N-1), so if
// allocate() returned non-null, the queue is guaranteed to have room.
global_ble->ble_events_.push(event);
// Push always succeeds because we're the only producer and the pool ensures we never exceed queue size
}
// Explicit template instantiations for the friend function

View File

@@ -221,7 +221,13 @@ class ESP32BLE : public Component {
// Large objects (size depends on template parameters, but typically aligned to 4 bytes)
esphome::LockFreeQueue<BLEEvent, MAX_BLE_QUEUE_SIZE> ble_events_;
esphome::EventPool<BLEEvent, MAX_BLE_QUEUE_SIZE> ble_event_pool_;
// Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements (one slot distinguishes full from empty).
// This guarantees allocate() returns nullptr before push() can fail, which:
// 1. Prevents leaking a pool slot (the Nth allocate succeeds but push fails)
// 2. Avoids needing release() on the producer path after a failed push(),
// preserving the SPSC contract on the pool's internal free list
esphome::EventPool<BLEEvent, MAX_BLE_QUEUE_SIZE - 1> ble_event_pool_;
// 4-byte aligned members
#ifdef USE_ESP32_BLE_ADVERTISING

View File

@@ -16,13 +16,9 @@ BLECharacteristic::~BLECharacteristic() {
for (auto *descriptor : this->descriptors_) {
delete descriptor; // NOLINT(cppcoreguidelines-owning-memory)
}
vSemaphoreDelete(this->set_value_lock_);
}
BLECharacteristic::BLECharacteristic(const ESPBTUUID uuid, uint32_t properties) : uuid_(uuid) {
this->set_value_lock_ = xSemaphoreCreateBinary();
xSemaphoreGive(this->set_value_lock_);
this->properties_ = (esp_gatt_char_prop_t) 0;
this->set_broadcast_property((properties & PROPERTY_BROADCAST) != 0);
@@ -35,11 +31,7 @@ BLECharacteristic::BLECharacteristic(const ESPBTUUID uuid, uint32_t properties)
void BLECharacteristic::set_value(ByteBuffer buffer) { this->set_value(buffer.get_data()); }
void BLECharacteristic::set_value(std::vector<uint8_t> &&buffer) {
xSemaphoreTake(this->set_value_lock_, 0L);
this->value_ = std::move(buffer);
xSemaphoreGive(this->set_value_lock_);
}
void BLECharacteristic::set_value(std::vector<uint8_t> &&buffer) { this->value_ = std::move(buffer); }
void BLECharacteristic::set_value(std::initializer_list<uint8_t> data) {
this->set_value(std::vector<uint8_t>(data)); // Delegate to move overload

View File

@@ -16,8 +16,6 @@
#include <esp_gattc_api.h>
#include <esp_gatts_api.h>
#include <esp_bt_defs.h>
#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>
namespace esphome {
namespace esp32_ble_server {
@@ -84,8 +82,6 @@ class BLECharacteristic {
uint16_t value_read_offset_{0};
std::vector<uint8_t> value_;
SemaphoreHandle_t set_value_lock_;
std::vector<BLEDescriptor *> descriptors_;
struct ClientNotificationEntry {

View File

@@ -87,7 +87,8 @@ void on_send_report(const uint8_t *mac_addr, esp_now_send_status_t status)
// Push the packet to the queue
global_esp_now->receive_packet_queue_.push(packet);
// Push always because we're the only producer and the pool ensures we never exceed queue size
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
// Wake main loop immediately to process ESP-NOW send event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
@@ -109,7 +110,8 @@ void on_data_received(const esp_now_recv_info_t *info, const uint8_t *data, int
// Push the packet to the queue
global_esp_now->receive_packet_queue_.push(packet);
// Push always because we're the only producer and the pool ensures we never exceed queue size
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
// Wake main loop immediately to process ESP-NOW receive event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)

View File

@@ -163,10 +163,14 @@ class ESPNowComponent : public Component {
uint8_t own_address_[ESP_NOW_ETH_ALEN]{0};
LockFreeQueue<ESPNowPacket, MAX_ESP_NOW_RECEIVE_QUEUE_SIZE> receive_packet_queue_{};
EventPool<ESPNowPacket, MAX_ESP_NOW_RECEIVE_QUEUE_SIZE> receive_packet_pool_{};
// Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements. This guarantees allocate() returns nullptr
// before push() can fail, preventing a pool slot leak.
EventPool<ESPNowPacket, MAX_ESP_NOW_RECEIVE_QUEUE_SIZE - 1> receive_packet_pool_{};
LockFreeQueue<ESPNowSendPacket, MAX_ESP_NOW_SEND_QUEUE_SIZE> send_packet_queue_{};
EventPool<ESPNowSendPacket, MAX_ESP_NOW_SEND_QUEUE_SIZE> send_packet_pool_{};
// Pool sized to queue capacity (SIZE-1) — see receive_packet_pool_ comment.
EventPool<ESPNowSendPacket, MAX_ESP_NOW_SEND_QUEUE_SIZE - 1> send_packet_pool_{};
ESPNowSendPacket *current_send_packet_{nullptr}; // Currently sending packet, nullptr if none
uint8_t wifi_channel_{0};

View File

@@ -82,10 +82,16 @@ bool MQTTBackendESP32::initialize_() {
void MQTTBackendESP32::loop() {
// process new events
// handle only 1 message per loop iteration
if (!mqtt_events_.empty()) {
auto &event = mqtt_events_.front();
mqtt_event_handler_(event);
mqtt_events_.pop();
Event *event = this->mqtt_event_queue_.pop();
if (event != nullptr) {
this->mqtt_event_handler_(*event);
this->mqtt_event_pool_.release(event);
}
// Log dropped inbound events (check is cheap - single atomic load in common case)
uint16_t inbound_dropped = this->mqtt_event_queue_.get_and_reset_dropped_count();
if (inbound_dropped > 0) {
ESP_LOGW(TAG, "Dropped %u inbound MQTT events", inbound_dropped);
}
#if defined(USE_MQTT_IDF_ENQUEUE)
@@ -183,10 +189,18 @@ void MQTTBackendESP32::mqtt_event_handler_(const Event &event) {
void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id,
void *event_data) {
MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args);
// queue event to decouple processing
// queue event to decouple processing from ESP-IDF MQTT task to main loop
if (instance) {
auto event = *static_cast<esp_mqtt_event_t *>(event_data);
instance->mqtt_events_.emplace(event);
auto *event = instance->mqtt_event_pool_.allocate();
if (event == nullptr) {
// Pool exhausted, drop event (counted via queue's dropped counter)
instance->mqtt_event_queue_.increment_dropped_count();
return;
}
event->populate(*static_cast<esp_mqtt_event_t *>(event_data));
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
instance->mqtt_event_queue_.push(event);
// Wake main loop immediately to process MQTT event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
@@ -226,14 +240,14 @@ void MQTTBackendESP32::esphome_mqtt_task(void *params) {
break;
}
}
this_mqtt->mqtt_event_pool_.release(elem);
this_mqtt->mqtt_outbound_pool_.release(elem);
}
}
}
bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
size_t len) {
auto *elem = this->mqtt_event_pool_.allocate();
auto *elem = this->mqtt_outbound_pool_.allocate();
if (!elem) {
// Queue is full - increment counter but don't log immediately.
@@ -253,7 +267,7 @@ bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos,
// Use the helper to allocate and copy data
if (!elem->set_data(topic, payload, len)) {
// Allocation failed, return elem to pool
this->mqtt_event_pool_.release(elem);
this->mqtt_outbound_pool_.release(elem);
// Increment counter without logging to avoid cascade effect during memory pressure
this->mqtt_queue_.increment_dropped_count();
return false;

View File

@@ -5,7 +5,6 @@
#ifdef USE_ESP32
#include <string>
#include <queue>
#include <cstring>
#include <mqtt_client.h>
#include <freertos/FreeRTOS.h>
@@ -18,32 +17,39 @@
namespace esphome::mqtt {
struct Event {
esp_mqtt_event_id_t event_id;
esp_mqtt_event_id_t event_id{};
std::vector<char> data;
int total_data_len;
int current_data_offset;
int total_data_len{0};
int current_data_offset{0};
std::string topic;
int msg_id;
bool retain;
int qos;
bool dup;
bool session_present;
esp_mqtt_error_codes_t error_handle;
int msg_id{0};
bool retain{false};
int qos{0};
bool dup{false};
bool session_present{false};
esp_mqtt_error_codes_t error_handle{};
// Construct from esp_mqtt_event_t
// Any pointer values that are unsafe to keep are converted to safe copies
Event(const esp_mqtt_event_t &event)
: event_id(event.event_id),
data(event.data, event.data + event.data_len),
total_data_len(event.total_data_len),
current_data_offset(event.current_data_offset),
topic(event.topic, event.topic_len),
msg_id(event.msg_id),
retain(event.retain),
qos(event.qos),
dup(event.dup),
session_present(event.session_present),
error_handle(*event.error_handle) {}
// Populate from esp_mqtt_event_t
// Copies pointer-based data to owned storage for safe cross-thread transfer
void populate(const esp_mqtt_event_t &event) {
this->event_id = event.event_id;
this->data.assign(event.data, event.data + event.data_len);
this->total_data_len = event.total_data_len;
this->current_data_offset = event.current_data_offset;
this->topic.assign(event.topic, event.topic_len);
this->msg_id = event.msg_id;
this->retain = event.retain;
this->qos = event.qos;
this->dup = event.dup;
this->session_present = event.session_present;
this->error_handle = *event.error_handle;
}
// Release owned resources for pool reuse (keeps allocated capacity for efficiency)
void release() {
this->data.clear();
this->topic.clear();
}
};
enum MqttQueueTypeT : uint8_t {
@@ -118,7 +124,8 @@ class MQTTBackendESP32 final : public MQTTBackend {
static constexpr size_t TASK_STACK_SIZE = 3072;
static constexpr size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations
static constexpr ssize_t TASK_PRIORITY = 5;
static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360
static constexpr uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360
static constexpr uint8_t MQTT_EVENT_QUEUE_LENGTH = 32; // Inbound events from broker
void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
@@ -251,7 +258,8 @@ class MQTTBackendESP32 final : public MQTTBackend {
bool skip_cert_cn_check_{false};
#if defined(USE_MQTT_IDF_ENQUEUE)
static void esphome_mqtt_task(void *params);
EventPool<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_event_pool_;
// Pool sized to queue capacity (SIZE-1) — see mqtt_event_pool_ comment.
EventPool<struct QueueElement, MQTT_QUEUE_LENGTH - 1> mqtt_outbound_pool_;
NotifyingLockFreeQueue<struct QueueElement, MQTT_QUEUE_LENGTH> mqtt_queue_;
TaskHandle_t task_handle_{nullptr};
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL,
@@ -266,7 +274,14 @@ class MQTTBackendESP32 final : public MQTTBackend {
CallbackManager<on_message_callback_t> on_message_;
CallbackManager<on_publish_user_callback_t> on_publish_;
std::string cached_topic_;
std::queue<Event> mqtt_events_;
// Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements (one slot distinguishes full from empty).
// This guarantees allocate() returns nullptr before push() can fail, which:
// 1. Prevents leaking a pool slot (the Nth allocate succeeds but push fails)
// 2. Avoids needing release() on the producer path after a failed push(),
// preserving the SPSC contract on the pool's internal free list
EventPool<Event, MQTT_EVENT_QUEUE_LENGTH - 1> mqtt_event_pool_;
LockFreeQueue<Event, MQTT_EVENT_QUEUE_LENGTH> mqtt_event_queue_;
#if defined(USE_MQTT_IDF_ENQUEUE)
uint32_t last_dropped_log_time_{0};

View File

@@ -417,7 +417,7 @@ void SpeakerMediaPlayer::loop() {
this->media_playlist_.pop_front();
}
// Only delay starting playback if moving on the next playlist item or repeating the current item
timeout_ms = this->announcement_playlist_delay_ms_;
timeout_ms = this->media_playlist_delay_ms_;
}
if (!this->media_playlist_.empty()) {
PlaylistItem playlist_item = this->media_playlist_.front();

View File

@@ -26,16 +26,13 @@ void USBCDCACMInstance::queue_line_state_event(bool dtr, bool rts) {
event->data.line_state.dtr = dtr;
event->data.line_state.rts = rts;
if (!this->event_queue_.push(event)) {
ESP_LOGW(TAG, "Event queue full, line state event dropped (itf=%d)", this->itf_);
// Return event to pool since we couldn't queue it
this->event_pool_.release(event);
} else {
// Wake main loop immediately to process event
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
this->event_queue_.push(event);
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
App.wake_loop_threadsafe();
App.wake_loop_threadsafe();
#endif
}
}
void USBCDCACMInstance::queue_line_coding_event(uint32_t bit_rate, uint8_t stop_bits, uint8_t parity,
@@ -53,16 +50,13 @@ void USBCDCACMInstance::queue_line_coding_event(uint32_t bit_rate, uint8_t stop_
event->data.line_coding.parity = parity;
event->data.line_coding.data_bits = data_bits;
if (!this->event_queue_.push(event)) {
ESP_LOGW(TAG, "Event queue full, line coding event dropped (itf=%d)", this->itf_);
// Return event to pool since we couldn't queue it
this->event_pool_.release(event);
} else {
// Wake main loop immediately to process event
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
this->event_queue_.push(event);
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
App.wake_loop_threadsafe();
App.wake_loop_threadsafe();
#endif
}
}
void USBCDCACMInstance::process_events_() {

View File

@@ -102,7 +102,11 @@ class USBCDCACMInstance : public uart::UARTComponent, public Parented<USBCDCACMC
LineStateCallback line_state_callback_{nullptr};
// Lock-free queue and event pool for cross-task event passing
EventPool<CDCEvent, EVENT_QUEUE_SIZE> event_pool_;
// Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements. This guarantees allocate() returns nullptr
// before push() can fail, preventing both a pool slot leak and an SPSC
// violation on the pool's internal free list.
EventPool<CDCEvent, EVENT_QUEUE_SIZE - 1> event_pool_;
LockFreeQueue<CDCEvent, EVENT_QUEUE_SIZE> event_queue_;
};

View File

@@ -144,7 +144,10 @@ class USBClient : public Component {
// Lock-free event queue and pool for USB task to main loop communication
// Must be public for access from static callbacks
LockFreeQueue<UsbEvent, USB_EVENT_QUEUE_SIZE> event_queue;
EventPool<UsbEvent, USB_EVENT_QUEUE_SIZE> event_pool;
// Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements. This guarantees allocate() returns nullptr
// before push() can fail, preventing a pool slot leak.
EventPool<UsbEvent, USB_EVENT_QUEUE_SIZE - 1> event_pool;
protected:
// Process USB events from the queue. Returns true if any work was done.

View File

@@ -193,7 +193,8 @@ static void client_event_cb(const usb_host_client_event_msg_t *event_msg, void *
return;
}
// Push to lock-free queue (always succeeds since pool size == queue size)
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
client->event_queue.push(event);
// Re-enable component loop to process the queued event

View File

@@ -160,11 +160,9 @@ void USBUartChannel::write_array(const uint8_t *data, size_t len) {
size_t chunk_len = std::min(len, UsbOutputChunk::MAX_CHUNK_SIZE);
memcpy(chunk->data, data, chunk_len);
chunk->length = static_cast<uint8_t>(chunk_len);
if (!this->output_queue_.push(chunk)) {
this->output_pool_.release(chunk);
ESP_LOGE(TAG, "Output queue full - lost %zu bytes", len);
break;
}
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
this->output_queue_.push(chunk);
data += chunk_len;
len -= chunk_len;
}
@@ -320,7 +318,8 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
chunk->channel = channel;
// Push to lock-free queue for main loop processing
// Push always succeeds because pool size == queue size
// Push always succeeds: pool is sized to queue capacity (SIZE-1), so if
// allocate() returned non-null, the queue cannot be full.
this->usb_data_queue_.push(chunk);
// Re-enable component loop to process the queued data

View File

@@ -158,7 +158,10 @@ class USBUartChannel : public uart::UARTComponent, public Parented<USBUartCompon
// Larger structures first (8+ bytes)
RingBuffer input_buffer_;
LockFreeQueue<UsbOutputChunk, USB_OUTPUT_CHUNK_COUNT> output_queue_;
EventPool<UsbOutputChunk, USB_OUTPUT_CHUNK_COUNT> output_pool_;
// Pool sized to queue capacity (SIZE-1) because LockFreeQueue<T,N> is a ring
// buffer that holds N-1 elements. This guarantees allocate() returns nullptr
// before push() can fail, preventing a pool slot leak.
EventPool<UsbOutputChunk, USB_OUTPUT_CHUNK_COUNT - 1> output_pool_;
std::function<void()> rx_callback_{};
CdcEps cdc_dev_{};
StringRef debug_prefix_{};
@@ -190,7 +193,8 @@ class USBUartComponent : public usb_host::USBClient {
// Lock-free data transfer from USB task to main loop
static constexpr int USB_DATA_QUEUE_SIZE = 32;
LockFreeQueue<UsbDataChunk, USB_DATA_QUEUE_SIZE> usb_data_queue_;
EventPool<UsbDataChunk, USB_DATA_QUEUE_SIZE> chunk_pool_;
// Pool sized to queue capacity (SIZE-1) — see USBUartChannel::output_pool_ comment.
EventPool<UsbDataChunk, USB_DATA_QUEUE_SIZE - 1> chunk_pool_;
protected:
std::vector<USBUartChannel *> channels_{};

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2026.3.0b3"
__version__ = "2026.3.0b4"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -100,6 +100,14 @@ class EntityBase {
// Get whether this Entity should be hidden outside ESPHome
bool is_internal() const { return this->flags_.internal; }
// Deprecated: Calling set_internal() at runtime is undefined behavior. Components and clients
// are NOT notified of the change, the flag may have already been read during setup, and there
// is NO guarantee any consumer will observe the new value. Use the 'internal:' YAML key instead.
ESPDEPRECATED("set_internal() is undefined behavior at runtime — components and Home Assistant are NOT "
"notified. Use the 'internal:' YAML key instead. Will be removed in 2027.3.0.",
"2026.3.0")
void set_internal(bool internal) { this->flags_.internal = internal; }
// Check if this object is declared to be disabled by default.
// That means that when the device gets added to Home Assistant (or other clients) it should
// not be added to the default view by default, and a user action is necessary to manually add it.

View File

@@ -211,6 +211,14 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
this->cancel_item_locked_(component, name_type, static_name, hash_or_id, type);
}
target->push_back(item);
if (target == &this->to_add_) {
this->to_add_count_increment_();
}
#ifndef ESPHOME_THREAD_SINGLE
else {
this->defer_count_increment_();
}
#endif
}
void HOT Scheduler::set_timeout(Component *component, const char *name, uint32_t timeout,
@@ -387,7 +395,7 @@ optional<uint32_t> HOT Scheduler::next_schedule_in(uint32_t now) {
// safe when called from the main thread. Other threads must not call this method.
// If no items, return empty optional
if (this->cleanup_() == 0)
if (!this->cleanup_())
return {};
SchedulerItem *item = this->items_[0];
@@ -421,7 +429,7 @@ void Scheduler::full_cleanup_removed_items_() {
this->items_.erase(this->items_.begin() + write, this->items_.end());
// Rebuild the heap structure since items are no longer in heap order
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
this->to_remove_ = 0;
this->to_remove_clear_();
}
#ifndef ESPHOME_THREAD_SINGLE
@@ -502,7 +510,7 @@ void HOT Scheduler::call(uint32_t now) {
// If we still have too many cancelled items, do a full cleanup
// This only happens if cancelled items are stuck in the middle/bottom of the heap
if (this->to_remove_ >= MAX_LOGICALLY_DELETED_ITEMS) {
if (this->to_remove_count_() >= MAX_LOGICALLY_DELETED_ITEMS) {
this->full_cleanup_removed_items_();
}
while (!this->items_.empty()) {
@@ -529,7 +537,7 @@ void HOT Scheduler::call(uint32_t now) {
LockGuard guard{this->lock_};
if (is_item_removed_locked_(item)) {
this->recycle_item_main_loop_(this->pop_raw_locked_());
this->to_remove_--;
this->to_remove_decrement_();
continue;
}
}
@@ -538,7 +546,7 @@ void HOT Scheduler::call(uint32_t now) {
if (is_item_removed_(item)) {
LockGuard guard{this->lock_};
this->recycle_item_main_loop_(this->pop_raw_locked_());
this->to_remove_--;
this->to_remove_decrement_();
continue;
}
#endif
@@ -566,7 +574,7 @@ void HOT Scheduler::call(uint32_t now) {
if (this->is_item_removed_locked_(executed_item)) {
// We were removed/cancelled in the function call, recycle and continue
this->to_remove_--;
this->to_remove_decrement_();
this->recycle_item_main_loop_(executed_item);
continue;
}
@@ -576,6 +584,7 @@ void HOT Scheduler::call(uint32_t now) {
// Add new item directly to to_add_
// since we have the lock held
this->to_add_.push_back(executed_item);
this->to_add_count_increment_();
} else {
// Timeout completed - recycle it
this->recycle_item_main_loop_(executed_item);
@@ -604,6 +613,10 @@ void HOT Scheduler::call(uint32_t now) {
#endif
}
void HOT Scheduler::process_to_add() {
// Fast path: skip lock acquisition when nothing to add.
// Worst case is a one-loop-iteration delay before newly added items are processed.
if (this->to_add_empty_())
return;
LockGuard guard{this->lock_};
for (auto *&it : this->to_add_) {
if (is_item_removed_locked_(it)) {
@@ -617,17 +630,14 @@ void HOT Scheduler::process_to_add() {
std::push_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
}
this->to_add_.clear();
this->to_add_count_clear_();
}
size_t HOT Scheduler::cleanup_() {
// Fast path: if nothing to remove, just return the current size
// Reading to_remove_ without lock is safe because:
// 1. We only call this from the main thread during call()
// 2. If it's 0, there's definitely nothing to cleanup
// 3. If it becomes non-zero after we check, cleanup will happen on the next loop iteration
// 4. Not all platforms support atomics, so we accept this race in favor of performance
// 5. The worst case is a one-loop-iteration delay in cleanup, which is harmless
if (this->to_remove_ == 0)
return this->items_.size();
bool HOT Scheduler::cleanup_() {
// Fast path: if nothing to remove, just check if items exist.
// Uses atomic load on platforms with atomics, falls back to always taking the lock otherwise.
// Worst case is a one-loop-iteration delay in cleanup.
if (this->to_remove_empty_())
return !this->items_.empty();
// We must hold the lock for the entire cleanup operation because:
// 1. We're modifying items_ (via pop_raw_locked_) which requires exclusive access
@@ -642,10 +652,10 @@ size_t HOT Scheduler::cleanup_() {
SchedulerItem *item = this->items_[0];
if (!this->is_item_removed_locked_(item))
break;
this->to_remove_--;
this->to_remove_decrement_();
this->recycle_item_main_loop_(this->pop_raw_locked_());
}
return this->items_.size();
return !this->items_.empty();
}
Scheduler::SchedulerItem *HOT Scheduler::pop_raw_locked_() {
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
@@ -698,7 +708,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, NameType name_type
size_t heap_cancelled = this->mark_matching_items_removed_locked_(this->items_, component, name_type, static_name,
hash_or_id, type, match_retry);
total_cancelled += heap_cancelled;
this->to_remove_ += heap_cancelled;
this->to_remove_add_(heap_cancelled);
}
// Cancel items in to_add_

View File

@@ -284,9 +284,9 @@ class Scheduler {
#endif
}
// Cleanup logically deleted items from the scheduler
// Returns the number of items remaining after cleanup
// Returns true if items remain after cleanup
// IMPORTANT: This method should only be called from the main thread (loop task).
size_t cleanup_();
bool cleanup_();
// Remove and return the front item from the heap as a raw pointer.
// Caller takes ownership and must either recycle or delete the item.
// IMPORTANT: Caller must hold the scheduler lock before calling this function.
@@ -395,15 +395,9 @@ class Scheduler {
// erase() on every pop, which would be O(n). The queue is processed once per loop -
// any items added during processing are left for the next loop iteration.
// Snapshot the queue end point - only process items that existed at loop start
// Items added during processing (by callbacks or other threads) run next loop
// No lock needed: single consumer (main loop), stale read just means we process less this iteration
size_t defer_queue_end = this->defer_queue_.size();
// Fast path: nothing to process, avoid lock entirely.
// Safe without lock: single consumer (main loop) reads front_, and a stale size() read
// from a concurrent push can only make us see fewer items — they'll be processed next loop.
if (this->defer_queue_front_ >= defer_queue_end)
// Worst case is a one-loop-iteration delay before newly deferred items are processed.
if (this->defer_empty_())
return;
// Merge lock acquisitions: instead of separate locks for move-out and recycle (2N+1 total),
@@ -412,6 +406,13 @@ class Scheduler {
SchedulerItem *item;
this->lock_.lock();
// Reset counter and snapshot queue end under lock
this->defer_count_clear_();
size_t defer_queue_end = this->defer_queue_.size();
if (this->defer_queue_front_ >= defer_queue_end) {
this->lock_.unlock();
return;
}
while (this->defer_queue_front_ < defer_queue_end) {
// Take ownership of the item, leaving nullptr in the vector slot.
// This is safe because:
@@ -527,14 +528,150 @@ class Scheduler {
Mutex lock_;
std::vector<SchedulerItem *> items_;
std::vector<SchedulerItem *> to_add_;
#ifndef ESPHOME_THREAD_SINGLE
// Fast-path counter for process_to_add() to skip taking the lock when there is
// nothing to add. Uses std::atomic on platforms that support it, plain uint32_t
// otherwise. On non-atomic platforms, callers must hold the scheduler lock when
// mutating this counter. Not needed on single-threaded platforms where we can
// check to_add_.empty() directly.
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
std::atomic<uint32_t> to_add_count_{0};
#else
uint32_t to_add_count_{0};
#endif
#endif /* ESPHOME_THREAD_SINGLE */
// Fast-path helper for process_to_add() to decide if it can try the lock-free path.
// - On ESPHOME_THREAD_SINGLE: direct container check is safe (no concurrent writers).
// - On ESPHOME_THREAD_MULTI_ATOMICS: performs a lock-free check via to_add_count_.
// - On ESPHOME_THREAD_MULTI_NO_ATOMICS: always returns false to force the caller
// down the locked path; this is NOT a lock-free emptiness check on that platform.
bool to_add_empty_() const {
#ifdef ESPHOME_THREAD_SINGLE
return this->to_add_.empty();
#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
return this->to_add_count_.load(std::memory_order_relaxed) == 0;
#else
return false;
#endif
}
// Increment to_add_count_ (no-op on single-threaded platforms)
void to_add_count_increment_() {
#ifdef ESPHOME_THREAD_SINGLE
// No counter needed — to_add_empty_() checks the vector directly
#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
this->to_add_count_.fetch_add(1, std::memory_order_relaxed);
#else
this->to_add_count_++;
#endif
}
// Reset to_add_count_ (no-op on single-threaded platforms)
void to_add_count_clear_() {
#ifdef ESPHOME_THREAD_SINGLE
// No counter needed — to_add_empty_() checks the vector directly
#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
this->to_add_count_.store(0, std::memory_order_relaxed);
#else
this->to_add_count_ = 0;
#endif
}
#ifndef ESPHOME_THREAD_SINGLE
// Single-core platforms don't need the defer queue and save ~32 bytes of RAM
// Using std::vector instead of std::deque avoids 512-byte chunked allocations
// Index tracking avoids O(n) erase() calls when draining the queue each loop
std::vector<SchedulerItem *> defer_queue_; // FIFO queue for defer() calls
size_t defer_queue_front_{0}; // Index of first valid item in defer_queue_ (tracks consumed items)
#endif /* ESPHOME_THREAD_SINGLE */
// Fast-path counter for process_defer_queue_() to skip lock when nothing to process.
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
std::atomic<uint32_t> defer_count_{0};
#else
uint32_t defer_count_{0};
#endif
bool defer_empty_() const {
// defer_queue_ only exists on multi-threaded platforms, so no ESPHOME_THREAD_SINGLE path
// ESPHOME_THREAD_MULTI_NO_ATOMICS: always take the lock
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
return this->defer_count_.load(std::memory_order_relaxed) == 0;
#else
return false;
#endif
}
void defer_count_increment_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->defer_count_.fetch_add(1, std::memory_order_relaxed);
#else
this->defer_count_++;
#endif
}
void defer_count_clear_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->defer_count_.store(0, std::memory_order_relaxed);
#else
this->defer_count_ = 0;
#endif
}
#endif /* ESPHOME_THREAD_SINGLE */
// Counter for items marked for removal. Incremented cross-thread in cancel_item_locked_().
// On ESPHOME_THREAD_MULTI_ATOMICS this is read without a lock in the cleanup_() fast path;
// on ESPHOME_THREAD_MULTI_NO_ATOMICS the fast path is disabled so cleanup_() always takes the lock.
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
std::atomic<uint32_t> to_remove_{0};
#else
uint32_t to_remove_{0};
#endif
// Lock-free check if there are items to remove (for fast-path in cleanup_)
bool to_remove_empty_() const {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
return this->to_remove_.load(std::memory_order_relaxed) == 0;
#elif defined(ESPHOME_THREAD_SINGLE)
return this->to_remove_ == 0;
#else
return false; // Always take the lock path
#endif
}
void to_remove_add_(uint32_t count) {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->to_remove_.fetch_add(count, std::memory_order_relaxed);
#else
this->to_remove_ += count;
#endif
}
void to_remove_decrement_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->to_remove_.fetch_sub(1, std::memory_order_relaxed);
#else
this->to_remove_--;
#endif
}
void to_remove_clear_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->to_remove_.store(0, std::memory_order_relaxed);
#else
this->to_remove_ = 0;
#endif
}
uint32_t to_remove_count_() const {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
return this->to_remove_.load(std::memory_order_relaxed);
#else
return this->to_remove_;
#endif
}
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
// Design decisions:

View File

@@ -384,7 +384,7 @@ def merge_component_configs(
# Write merged config
output_file.parent.mkdir(parents=True, exist_ok=True)
yaml_content = yaml_util.dump(merged_config_data)
output_file.write_text(yaml_content)
output_file.write_text(yaml_content, encoding="utf-8")
print(f"Successfully merged {len(component_names)} components into {output_file}")