[scheduler] Fix UB in cross-thread counter/vector reads, add atomic fast-path (#14880)

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
This commit is contained in:
J. Nick Koston
2026-03-17 14:19:31 -10:00
committed by Jesse Hills
parent be2e4a5278
commit 0fa96b6e1e
2 changed files with 177 additions and 30 deletions

View File

@@ -211,6 +211,14 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
this->cancel_item_locked_(component, name_type, static_name, hash_or_id, type);
}
target->push_back(item);
if (target == &this->to_add_) {
this->to_add_count_increment_();
}
#ifndef ESPHOME_THREAD_SINGLE
else {
this->defer_count_increment_();
}
#endif
}
void HOT Scheduler::set_timeout(Component *component, const char *name, uint32_t timeout,
@@ -387,7 +395,7 @@ optional<uint32_t> HOT Scheduler::next_schedule_in(uint32_t now) {
// safe when called from the main thread. Other threads must not call this method.
// If no items, return empty optional
if (this->cleanup_() == 0)
if (!this->cleanup_())
return {};
SchedulerItem *item = this->items_[0];
@@ -421,7 +429,7 @@ void Scheduler::full_cleanup_removed_items_() {
this->items_.erase(this->items_.begin() + write, this->items_.end());
// Rebuild the heap structure since items are no longer in heap order
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
this->to_remove_ = 0;
this->to_remove_clear_();
}
#ifndef ESPHOME_THREAD_SINGLE
@@ -502,7 +510,7 @@ void HOT Scheduler::call(uint32_t now) {
// If we still have too many cancelled items, do a full cleanup
// This only happens if cancelled items are stuck in the middle/bottom of the heap
if (this->to_remove_ >= MAX_LOGICALLY_DELETED_ITEMS) {
if (this->to_remove_count_() >= MAX_LOGICALLY_DELETED_ITEMS) {
this->full_cleanup_removed_items_();
}
while (!this->items_.empty()) {
@@ -529,7 +537,7 @@ void HOT Scheduler::call(uint32_t now) {
LockGuard guard{this->lock_};
if (is_item_removed_locked_(item)) {
this->recycle_item_main_loop_(this->pop_raw_locked_());
this->to_remove_--;
this->to_remove_decrement_();
continue;
}
}
@@ -538,7 +546,7 @@ void HOT Scheduler::call(uint32_t now) {
if (is_item_removed_(item)) {
LockGuard guard{this->lock_};
this->recycle_item_main_loop_(this->pop_raw_locked_());
this->to_remove_--;
this->to_remove_decrement_();
continue;
}
#endif
@@ -566,7 +574,7 @@ void HOT Scheduler::call(uint32_t now) {
if (this->is_item_removed_locked_(executed_item)) {
// We were removed/cancelled in the function call, recycle and continue
this->to_remove_--;
this->to_remove_decrement_();
this->recycle_item_main_loop_(executed_item);
continue;
}
@@ -576,6 +584,7 @@ void HOT Scheduler::call(uint32_t now) {
// Add new item directly to to_add_
// since we have the lock held
this->to_add_.push_back(executed_item);
this->to_add_count_increment_();
} else {
// Timeout completed - recycle it
this->recycle_item_main_loop_(executed_item);
@@ -604,6 +613,10 @@ void HOT Scheduler::call(uint32_t now) {
#endif
}
void HOT Scheduler::process_to_add() {
// Fast path: skip lock acquisition when nothing to add.
// Worst case is a one-loop-iteration delay before newly added items are processed.
if (this->to_add_empty_())
return;
LockGuard guard{this->lock_};
for (auto *&it : this->to_add_) {
if (is_item_removed_locked_(it)) {
@@ -617,17 +630,14 @@ void HOT Scheduler::process_to_add() {
std::push_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
}
this->to_add_.clear();
this->to_add_count_clear_();
}
size_t HOT Scheduler::cleanup_() {
// Fast path: if nothing to remove, just return the current size
// Reading to_remove_ without lock is safe because:
// 1. We only call this from the main thread during call()
// 2. If it's 0, there's definitely nothing to cleanup
// 3. If it becomes non-zero after we check, cleanup will happen on the next loop iteration
// 4. Not all platforms support atomics, so we accept this race in favor of performance
// 5. The worst case is a one-loop-iteration delay in cleanup, which is harmless
if (this->to_remove_ == 0)
return this->items_.size();
bool HOT Scheduler::cleanup_() {
// Fast path: if nothing to remove, just check if items exist.
// Uses atomic load on platforms with atomics, falls back to always taking the lock otherwise.
// Worst case is a one-loop-iteration delay in cleanup.
if (this->to_remove_empty_())
return !this->items_.empty();
// We must hold the lock for the entire cleanup operation because:
// 1. We're modifying items_ (via pop_raw_locked_) which requires exclusive access
@@ -642,10 +652,10 @@ size_t HOT Scheduler::cleanup_() {
SchedulerItem *item = this->items_[0];
if (!this->is_item_removed_locked_(item))
break;
this->to_remove_--;
this->to_remove_decrement_();
this->recycle_item_main_loop_(this->pop_raw_locked_());
}
return this->items_.size();
return !this->items_.empty();
}
Scheduler::SchedulerItem *HOT Scheduler::pop_raw_locked_() {
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
@@ -698,7 +708,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, NameType name_type
size_t heap_cancelled = this->mark_matching_items_removed_locked_(this->items_, component, name_type, static_name,
hash_or_id, type, match_retry);
total_cancelled += heap_cancelled;
this->to_remove_ += heap_cancelled;
this->to_remove_add_(heap_cancelled);
}
// Cancel items in to_add_

View File

@@ -284,9 +284,9 @@ class Scheduler {
#endif
}
// Cleanup logically deleted items from the scheduler
// Returns the number of items remaining after cleanup
// Returns true if items remain after cleanup
// IMPORTANT: This method should only be called from the main thread (loop task).
size_t cleanup_();
bool cleanup_();
// Remove and return the front item from the heap as a raw pointer.
// Caller takes ownership and must either recycle or delete the item.
// IMPORTANT: Caller must hold the scheduler lock before calling this function.
@@ -395,15 +395,9 @@ class Scheduler {
// erase() on every pop, which would be O(n). The queue is processed once per loop -
// any items added during processing are left for the next loop iteration.
// Snapshot the queue end point - only process items that existed at loop start
// Items added during processing (by callbacks or other threads) run next loop
// No lock needed: single consumer (main loop), stale read just means we process less this iteration
size_t defer_queue_end = this->defer_queue_.size();
// Fast path: nothing to process, avoid lock entirely.
// Safe without lock: single consumer (main loop) reads front_, and a stale size() read
// from a concurrent push can only make us see fewer items — they'll be processed next loop.
if (this->defer_queue_front_ >= defer_queue_end)
// Worst case is a one-loop-iteration delay before newly deferred items are processed.
if (this->defer_empty_())
return;
// Merge lock acquisitions: instead of separate locks for move-out and recycle (2N+1 total),
@@ -412,6 +406,13 @@ class Scheduler {
SchedulerItem *item;
this->lock_.lock();
// Reset counter and snapshot queue end under lock
this->defer_count_clear_();
size_t defer_queue_end = this->defer_queue_.size();
if (this->defer_queue_front_ >= defer_queue_end) {
this->lock_.unlock();
return;
}
while (this->defer_queue_front_ < defer_queue_end) {
// Take ownership of the item, leaving nullptr in the vector slot.
// This is safe because:
@@ -527,14 +528,150 @@ class Scheduler {
Mutex lock_;
std::vector<SchedulerItem *> items_;
std::vector<SchedulerItem *> to_add_;
#ifndef ESPHOME_THREAD_SINGLE
// Fast-path counter for process_to_add() to skip taking the lock when there is
// nothing to add. Uses std::atomic on platforms that support it, plain uint32_t
// otherwise. On non-atomic platforms, callers must hold the scheduler lock when
// mutating this counter. Not needed on single-threaded platforms where we can
// check to_add_.empty() directly.
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
std::atomic<uint32_t> to_add_count_{0};
#else
uint32_t to_add_count_{0};
#endif
#endif /* ESPHOME_THREAD_SINGLE */
// Fast-path helper for process_to_add() to decide if it can try the lock-free path.
// - On ESPHOME_THREAD_SINGLE: direct container check is safe (no concurrent writers).
// - On ESPHOME_THREAD_MULTI_ATOMICS: performs a lock-free check via to_add_count_.
// - On ESPHOME_THREAD_MULTI_NO_ATOMICS: always returns false to force the caller
// down the locked path; this is NOT a lock-free emptiness check on that platform.
bool to_add_empty_() const {
#ifdef ESPHOME_THREAD_SINGLE
return this->to_add_.empty();
#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
return this->to_add_count_.load(std::memory_order_relaxed) == 0;
#else
return false;
#endif
}
// Increment to_add_count_ (no-op on single-threaded platforms)
void to_add_count_increment_() {
#ifdef ESPHOME_THREAD_SINGLE
// No counter needed — to_add_empty_() checks the vector directly
#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
this->to_add_count_.fetch_add(1, std::memory_order_relaxed);
#else
this->to_add_count_++;
#endif
}
// Reset to_add_count_ (no-op on single-threaded platforms)
void to_add_count_clear_() {
#ifdef ESPHOME_THREAD_SINGLE
// No counter needed — to_add_empty_() checks the vector directly
#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
this->to_add_count_.store(0, std::memory_order_relaxed);
#else
this->to_add_count_ = 0;
#endif
}
#ifndef ESPHOME_THREAD_SINGLE
// Single-core platforms don't need the defer queue and save ~32 bytes of RAM
// Using std::vector instead of std::deque avoids 512-byte chunked allocations
// Index tracking avoids O(n) erase() calls when draining the queue each loop
std::vector<SchedulerItem *> defer_queue_; // FIFO queue for defer() calls
size_t defer_queue_front_{0}; // Index of first valid item in defer_queue_ (tracks consumed items)
#endif /* ESPHOME_THREAD_SINGLE */
// Fast-path counter for process_defer_queue_() to skip lock when nothing to process.
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
std::atomic<uint32_t> defer_count_{0};
#else
uint32_t defer_count_{0};
#endif
bool defer_empty_() const {
// defer_queue_ only exists on multi-threaded platforms, so no ESPHOME_THREAD_SINGLE path
// ESPHOME_THREAD_MULTI_NO_ATOMICS: always take the lock
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
return this->defer_count_.load(std::memory_order_relaxed) == 0;
#else
return false;
#endif
}
void defer_count_increment_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->defer_count_.fetch_add(1, std::memory_order_relaxed);
#else
this->defer_count_++;
#endif
}
void defer_count_clear_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->defer_count_.store(0, std::memory_order_relaxed);
#else
this->defer_count_ = 0;
#endif
}
#endif /* ESPHOME_THREAD_SINGLE */
// Counter for items marked for removal. Incremented cross-thread in cancel_item_locked_().
// On ESPHOME_THREAD_MULTI_ATOMICS this is read without a lock in the cleanup_() fast path;
// on ESPHOME_THREAD_MULTI_NO_ATOMICS the fast path is disabled so cleanup_() always takes the lock.
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
std::atomic<uint32_t> to_remove_{0};
#else
uint32_t to_remove_{0};
#endif
// Lock-free check if there are items to remove (for fast-path in cleanup_)
bool to_remove_empty_() const {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
return this->to_remove_.load(std::memory_order_relaxed) == 0;
#elif defined(ESPHOME_THREAD_SINGLE)
return this->to_remove_ == 0;
#else
return false; // Always take the lock path
#endif
}
void to_remove_add_(uint32_t count) {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->to_remove_.fetch_add(count, std::memory_order_relaxed);
#else
this->to_remove_ += count;
#endif
}
void to_remove_decrement_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->to_remove_.fetch_sub(1, std::memory_order_relaxed);
#else
this->to_remove_--;
#endif
}
void to_remove_clear_() {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
this->to_remove_.store(0, std::memory_order_relaxed);
#else
this->to_remove_ = 0;
#endif
}
uint32_t to_remove_count_() const {
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
return this->to_remove_.load(std::memory_order_relaxed);
#else
return this->to_remove_;
#endif
}
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
// Design decisions: