From 674d030cbb47507a83c57b63a816088591a5e8a5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 7 Apr 2026 07:36:55 -1000 Subject: [PATCH] [core] Reschedule fired intervals directly into heap (#15516) --- esphome/core/scheduler.cpp | 24 ++- .../scheduler_interval_reschedule.yaml | 113 ++++++++++++ .../test_scheduler_interval_reschedule.py | 165 ++++++++++++++++++ 3 files changed, 297 insertions(+), 5 deletions(-) create mode 100644 tests/integration/fixtures/scheduler_interval_reschedule.yaml create mode 100644 tests/integration/test_scheduler_interval_reschedule.py diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 71b29390d6..dff50b03ef 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -529,7 +529,7 @@ void HOT Scheduler::call(uint32_t now) { const auto now_64 = this->millis_64_from_(now); this->process_to_add(); - // Track if any items were added to to_add_ during this call (intervals or from callbacks) + // Track if any items were added to to_add_ during callbacks bool has_added_items = false; #ifdef ESPHOME_DEBUG_SCHEDULER @@ -578,6 +578,12 @@ void HOT Scheduler::call(uint32_t now) { if (this->to_remove_count_() >= MAX_LOGICALLY_DELETED_ITEMS) { this->full_cleanup_removed_items_(); } + // IMPORTANT: This loop uses index-based access (items_[0]), NOT iterators. + // This is intentional — fired intervals are pushed back into items_ via + // push_back() + push_heap() below, which may reallocate the vector's storage. + // Index-based access is safe across reallocations because we re-read items_[0] + // at the top of each iteration. Do NOT convert this to a range-based for loop + // or iterator-based loop, as that would break when items are added. while (!this->items_.empty()) { // Don't copy-by value yet SchedulerItem *item = this->items_[0]; @@ -646,10 +652,18 @@ void HOT Scheduler::call(uint32_t now) { if (executed_item->type == SchedulerItem::INTERVAL) { executed_item->set_next_execution(now_64 + executed_item->interval); - // Add new item directly to to_add_ - // since we have the lock held - this->to_add_.push_back(executed_item); - this->to_add_count_increment_(); + // Push directly back into the heap instead of routing through to_add_. + // This is safe because: + // 1. We're on the main loop and already hold the lock + // 2. The item was already popped from items_ via pop_raw_locked_() above + // 3. The while loop uses index-based access (items_[0]), not iterators, + // so push_back() reallocation cannot invalidate our iteration + // 4. push_heap() restores the heap invariant before the next iteration + // peeks at items_[0] + // This avoids the to_add_ detour and the overhead of + // process_to_add_slow_path_() (lock acquisition, vector iteration, clear). + this->items_.push_back(executed_item); + std::push_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); } else { // Timeout completed - recycle it this->recycle_item_main_loop_(executed_item); diff --git a/tests/integration/fixtures/scheduler_interval_reschedule.yaml b/tests/integration/fixtures/scheduler_interval_reschedule.yaml new file mode 100644 index 0000000000..a0f882deee --- /dev/null +++ b/tests/integration/fixtures/scheduler_interval_reschedule.yaml @@ -0,0 +1,113 @@ +esphome: + name: sched-interval-resched + +host: +api: +logger: + level: DEBUG + +globals: + # Counters for each interval + - id: fast_count + type: int + initial_value: "0" + - id: medium_count + type: int + initial_value: "0" + - id: slow_count + type: int + initial_value: "0" + # Track interval that cancels itself + - id: self_cancel_count + type: int + initial_value: "0" + # Track interval that schedules a timeout from its callback + - id: callback_timeout_fired + type: bool + initial_value: "false" + # Track set_interval replacing itself from within callback + - id: replace_count + type: int + initial_value: "0" + - id: replaced_count + type: int + initial_value: "0" + +interval: + # Fast interval: 50ms + - interval: 50ms + then: + - lambda: |- + id(fast_count) += 1; + if (id(fast_count) == 10) { + ESP_LOGI("test", "FAST_10_REACHED"); + } + + # Medium interval: 100ms + - interval: 100ms + then: + - lambda: |- + id(medium_count) += 1; + if (id(medium_count) == 5) { + ESP_LOGI("test", "MEDIUM_5_REACHED fast_count=%d", id(fast_count)); + } + + # Slow interval: 200ms + - interval: 200ms + then: + - lambda: |- + id(slow_count) += 1; + if (id(slow_count) == 3) { + ESP_LOGI("test", "SLOW_3_REACHED fast_count=%d medium_count=%d", id(fast_count), id(medium_count)); + } + + # Interval that cancels itself after 3 fires + - interval: 75ms + id: self_cancelling + then: + - lambda: |- + id(self_cancel_count) += 1; + ESP_LOGI("test", "SELF_CANCEL_FIRE count=%d", id(self_cancel_count)); + if (id(self_cancel_count) >= 3) { + id(self_cancelling)->stop_poller(); + ESP_LOGI("test", "SELF_CANCEL_STOPPED"); + } + + # Interval that schedules a timeout from its callback (tests to_add_ path) + - interval: 150ms + id: timeout_creator + then: + - lambda: |- + if (!id(callback_timeout_fired)) { + // Schedule a one-shot timeout from within an interval callback + // This goes through to_add_ (not the direct push_heap path) + id(timeout_creator)->set_timeout("test_timeout", 10, []() { + id(callback_timeout_fired) = true; + ESP_LOGI("test", "CALLBACK_TIMEOUT_FIRED"); + }); + // Stop this interval after scheduling the timeout + id(timeout_creator)->stop_poller(); + } + + # Interval that calls set_interval with the same name from within its callback, + # replacing itself. Tests that the old item (marked removed) is not rescheduled + # via push_heap, and the new item goes through to_add_ correctly. + - interval: 60ms + id: replace_test + then: + - lambda: |- + id(replace_count) += 1; + if (id(replace_count) == 1) { + ESP_LOGI("test", "REPLACE_ORIGINAL_FIRE"); + // Replace the polling interval with a named interval on the same component + id(replace_test)->set_interval("replaced_interval", 80, []() { + id(replaced_count) += 1; + ESP_LOGI("test", "REPLACED_FIRE count=%d", id(replaced_count)); + if (id(replaced_count) >= 3) { + id(replace_test)->cancel_interval("replaced_interval"); + ESP_LOGI("test", "REPLACED_STOPPED"); + } + }); + // Stop the original polling interval + id(replace_test)->stop_poller(); + } diff --git a/tests/integration/test_scheduler_interval_reschedule.py b/tests/integration/test_scheduler_interval_reschedule.py new file mode 100644 index 0000000000..d141bd9f15 --- /dev/null +++ b/tests/integration/test_scheduler_interval_reschedule.py @@ -0,0 +1,165 @@ +"""Test that intervals are correctly rescheduled after firing. + +This test verifies the optimization where fired intervals are pushed directly +back into the scheduler's heap (items_) via push_back() + push_heap(), instead +of routing through the to_add_ staging vector and process_to_add_slow_path_(). + +Key scenarios tested: +1. Multiple intervals at different periods all fire at correct rates +2. Heap ordering is preserved — faster intervals fire proportionally more often +3. An interval that cancels itself mid-callback is not rescheduled +4. A timeout scheduled from within an interval callback (to_add_ path) still works +5. An interval that replaces itself via set_interval from within its callback +""" + +from __future__ import annotations + +import asyncio +import re + +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_interval_reschedule( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that intervals are correctly rescheduled via direct heap insertion.""" + loop = asyncio.get_running_loop() + + # Futures for each milestone + fast_10_future: asyncio.Future[None] = loop.create_future() + medium_5_future: asyncio.Future[tuple[int]] = loop.create_future() + slow_3_future: asyncio.Future[tuple[int, int]] = loop.create_future() + self_cancel_stopped_future: asyncio.Future[None] = loop.create_future() + callback_timeout_future: asyncio.Future[None] = loop.create_future() + replace_original_future: asyncio.Future[None] = loop.create_future() + replaced_stopped_future: asyncio.Future[None] = loop.create_future() + + self_cancel_fire_count = 0 + replaced_fire_count = 0 + + def on_log_line(line: str) -> None: + nonlocal self_cancel_fire_count, replaced_fire_count + + if "FAST_10_REACHED" in line and not fast_10_future.done(): + fast_10_future.set_result(None) + + match = re.search(r"MEDIUM_5_REACHED fast_count=(\d+)", line) + if match and not medium_5_future.done(): + medium_5_future.set_result((int(match.group(1)),)) + + match = re.search(r"SLOW_3_REACHED fast_count=(\d+) medium_count=(\d+)", line) + if match and not slow_3_future.done(): + slow_3_future.set_result((int(match.group(1)), int(match.group(2)))) + + match = re.search(r"SELF_CANCEL_FIRE count=(\d+)", line) + if match: + self_cancel_fire_count = int(match.group(1)) + + if "SELF_CANCEL_STOPPED" in line and not self_cancel_stopped_future.done(): + self_cancel_stopped_future.set_result(None) + + if "CALLBACK_TIMEOUT_FIRED" in line and not callback_timeout_future.done(): + callback_timeout_future.set_result(None) + + if "REPLACE_ORIGINAL_FIRE" in line and not replace_original_future.done(): + replace_original_future.set_result(None) + + match = re.search(r"REPLACED_FIRE count=(\d+)", line) + if match: + replaced_fire_count = int(match.group(1)) + + if "REPLACED_STOPPED" in line and not replaced_stopped_future.done(): + replaced_stopped_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "sched-interval-resched" + + # 1. Fast interval (50ms) should reach 10 fires within ~600ms + try: + await asyncio.wait_for(fast_10_future, timeout=5.0) + except TimeoutError: + pytest.fail("Fast interval (50ms) did not fire 10 times") + + # 2. Medium interval (100ms) should reach 5 fires + # At that point, fast_count should be roughly 2x medium_count + try: + result = await asyncio.wait_for(medium_5_future, timeout=5.0) + except TimeoutError: + pytest.fail("Medium interval (100ms) did not fire 5 times") + + fast_at_medium_5 = result[0] + # Fast runs at 50ms, medium at 100ms, so fast should be ~2x medium + # Allow some slack for scheduling jitter + assert fast_at_medium_5 >= 7, ( + f"Fast interval should have fired at least 7 times when medium hit 5, " + f"but only fired {fast_at_medium_5} times" + ) + + # 3. Slow interval (200ms) should reach 3 fires + # At that point, both fast and medium should have proportionally more fires + try: + result = await asyncio.wait_for(slow_3_future, timeout=5.0) + except TimeoutError: + pytest.fail("Slow interval (200ms) did not fire 3 times") + + fast_at_slow_3, medium_at_slow_3 = result + # At 600ms: fast ~12, medium ~6, slow 3 + assert fast_at_slow_3 >= 8, ( + f"Fast should have fired at least 8 times when slow hit 3, " + f"but only fired {fast_at_slow_3}" + ) + assert medium_at_slow_3 >= 4, ( + f"Medium should have fired at least 4 times when slow hit 3, " + f"but only fired {medium_at_slow_3}" + ) + + # 4. Self-cancelling interval should have stopped after exactly 3 fires + try: + await asyncio.wait_for(self_cancel_stopped_future, timeout=5.0) + except TimeoutError: + pytest.fail("Self-cancelling interval did not stop") + + # Wait a bit to ensure it doesn't fire again + await asyncio.sleep(0.3) + assert self_cancel_fire_count == 3, ( + f"Self-cancelling interval fired {self_cancel_fire_count} times, " + f"expected exactly 3" + ) + + # 5. Timeout scheduled from interval callback should have fired + try: + await asyncio.wait_for(callback_timeout_future, timeout=5.0) + except TimeoutError: + pytest.fail("Timeout scheduled from interval callback did not fire") + + # 6. Interval that replaces itself via set_interval from within callback + # The original fires once, sets up a new named interval, then stops itself. + # The replacement interval should fire 3 times then cancel itself. + try: + await asyncio.wait_for(replace_original_future, timeout=5.0) + except TimeoutError: + pytest.fail("Replace-test original interval did not fire") + + try: + await asyncio.wait_for(replaced_stopped_future, timeout=5.0) + except TimeoutError: + pytest.fail( + f"Replaced interval did not stop. Fired {replaced_fire_count} times" + ) + + # Wait to ensure replacement doesn't fire again after cancellation + await asyncio.sleep(0.3) + assert replaced_fire_count == 3, ( + f"Replaced interval fired {replaced_fire_count} times, expected exactly 3" + )