mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 16:02:40 +00:00
478 lines
15 KiB
Python
478 lines
15 KiB
Python
"""Tests for esphome.automation module."""
|
|
|
|
from collections.abc import Generator
|
|
from unittest.mock import AsyncMock, call, patch
|
|
|
|
import pytest
|
|
|
|
from esphome.automation import (
|
|
CallbackAutomation,
|
|
TriggerForwarder,
|
|
TriggerOnFalseForwarder,
|
|
TriggerOnTrueForwarder,
|
|
build_callback_automations,
|
|
has_non_synchronous_actions,
|
|
)
|
|
from esphome.cpp_generator import MockObj, RawExpression
|
|
from esphome.util import RegistryEntry
|
|
|
|
|
|
def _make_registry(non_synchronous_actions: set[str]) -> dict[str, RegistryEntry]:
|
|
"""Create a mock ACTION_REGISTRY with specified non-synchronous actions.
|
|
|
|
Uses the default synchronous=False, matching the real registry behavior.
|
|
"""
|
|
registry: dict[str, RegistryEntry] = {}
|
|
for name in non_synchronous_actions:
|
|
registry[name] = RegistryEntry(name, lambda: None, None, None)
|
|
return registry
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_registry() -> Generator[dict[str, RegistryEntry]]:
|
|
"""Fixture that patches ACTION_REGISTRY with delay, wait_until, script.wait as non-synchronous."""
|
|
registry: dict[str, RegistryEntry] = _make_registry(
|
|
{"delay", "wait_until", "script.wait"}
|
|
)
|
|
registry["logger.log"] = RegistryEntry(
|
|
"logger.log", lambda: None, None, None, synchronous=True
|
|
)
|
|
with patch("esphome.automation.ACTION_REGISTRY", registry):
|
|
yield registry
|
|
|
|
|
|
def test_has_non_synchronous_actions_empty_list(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions([]) is False
|
|
|
|
|
|
def test_has_non_synchronous_actions_empty_dict(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions({}) is False
|
|
|
|
|
|
def test_has_non_synchronous_actions_non_dict_non_list(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions("string") is False
|
|
assert has_non_synchronous_actions(42) is False
|
|
assert has_non_synchronous_actions(None) is False
|
|
|
|
|
|
def test_has_non_synchronous_actions_delay(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions([{"delay": "1s"}]) is True
|
|
|
|
|
|
def test_has_non_synchronous_actions_wait_until(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions([{"wait_until": {"condition": {}}}]) is True
|
|
|
|
|
|
def test_has_non_synchronous_actions_script_wait(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions([{"script.wait": "script_id"}]) is True
|
|
|
|
|
|
def test_has_non_synchronous_actions_synchronous(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert has_non_synchronous_actions([{"logger.log": "hello"}]) is False
|
|
|
|
|
|
def test_has_non_synchronous_actions_unknown_not_in_registry(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
"""Unknown actions not in registry are not flagged (only registered actions count)."""
|
|
assert has_non_synchronous_actions([{"unknown.action": "value"}]) is False
|
|
|
|
|
|
def test_has_non_synchronous_actions_default_non_synchronous(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
"""Actions registered without explicit synchronous=True default to non-synchronous."""
|
|
mock_registry["some.action"] = RegistryEntry(
|
|
"some.action", lambda: None, None, None
|
|
)
|
|
assert has_non_synchronous_actions([{"some.action": "value"}]) is True
|
|
|
|
|
|
def test_has_non_synchronous_actions_nested_in_then(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
"""Non-synchronous action nested inside a synchronous action's then block."""
|
|
actions: list[dict[str, object]] = [
|
|
{
|
|
"logger.log": "first",
|
|
"then": [{"delay": "1s"}],
|
|
}
|
|
]
|
|
assert has_non_synchronous_actions(actions) is True
|
|
|
|
|
|
def test_has_non_synchronous_actions_deeply_nested(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
"""Non-synchronous action deeply nested in action structure."""
|
|
actions: list[dict[str, object]] = [
|
|
{
|
|
"if": {
|
|
"then": [
|
|
{"logger.log": "hello"},
|
|
{"delay": "500ms"},
|
|
]
|
|
}
|
|
}
|
|
]
|
|
assert has_non_synchronous_actions(actions) is True
|
|
|
|
|
|
def test_has_non_synchronous_actions_none_in_nested(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
"""No non-synchronous actions even with nesting."""
|
|
actions: list[dict[str, object]] = [
|
|
{
|
|
"if": {
|
|
"then": [
|
|
{"logger.log": "hello"},
|
|
]
|
|
}
|
|
}
|
|
]
|
|
assert has_non_synchronous_actions(actions) is False
|
|
|
|
|
|
def test_has_non_synchronous_actions_multiple_one_non_synchronous(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert (
|
|
has_non_synchronous_actions(
|
|
[
|
|
{"logger.log": "first"},
|
|
{"delay": "1s"},
|
|
{"logger.log": "second"},
|
|
]
|
|
)
|
|
is True
|
|
)
|
|
|
|
|
|
def test_has_non_synchronous_actions_multiple_all_synchronous(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
assert (
|
|
has_non_synchronous_actions(
|
|
[
|
|
{"logger.log": "first"},
|
|
{"logger.log": "second"},
|
|
]
|
|
)
|
|
is False
|
|
)
|
|
|
|
|
|
def test_has_non_synchronous_actions_dict_input(
|
|
mock_registry: dict[str, RegistryEntry],
|
|
) -> None:
|
|
"""Direct dict input (single action)."""
|
|
assert has_non_synchronous_actions({"delay": "1s"}) is True
|
|
assert has_non_synchronous_actions({"logger.log": "hello"}) is False
|
|
|
|
|
|
def _build_forwarder(
|
|
automation_name: str,
|
|
args: list[tuple[str, str]],
|
|
forwarder: MockObj | None = None,
|
|
) -> str:
|
|
"""Build a trigger forwarder expression the same way build_callback_automation does.
|
|
|
|
Mirrors the forwarder selection logic in automation.build_callback_automation.
|
|
"""
|
|
import esphome.codegen as cg
|
|
|
|
obj = MockObj(automation_name, "->")
|
|
if forwarder is None:
|
|
arg_types = [RawExpression(t) for t, _ in args]
|
|
templ = (
|
|
cg.TemplateArguments(*arg_types) if arg_types else cg.TemplateArguments()
|
|
)
|
|
forwarder = TriggerForwarder.template(templ)
|
|
return f"{forwarder}{{{obj}}}"
|
|
|
|
|
|
def test_trigger_forwarder_no_args() -> None:
|
|
"""Button on_press: TriggerForwarder<> with no args."""
|
|
result = _build_forwarder("auto_1", [])
|
|
assert result == "TriggerForwarder<>{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_single_float_arg() -> None:
|
|
"""Sensor on_value: TriggerForwarder<float>."""
|
|
result = _build_forwarder("auto_1", [("float", "x")])
|
|
assert result == "TriggerForwarder<float>{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_single_bool_arg() -> None:
|
|
"""Switch on_state: TriggerForwarder<bool>."""
|
|
result = _build_forwarder("auto_1", [("bool", "x")])
|
|
assert result == "TriggerForwarder<bool>{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_on_true() -> None:
|
|
"""Binary_sensor on_press / switch on_turn_on: TriggerOnTrueForwarder."""
|
|
result = _build_forwarder("auto_1", [], forwarder=TriggerOnTrueForwarder)
|
|
assert result == "TriggerOnTrueForwarder{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_on_false() -> None:
|
|
"""Binary_sensor on_release / switch on_turn_off: TriggerOnFalseForwarder."""
|
|
result = _build_forwarder("auto_1", [], forwarder=TriggerOnFalseForwarder)
|
|
assert result == "TriggerOnFalseForwarder{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_multiple_args() -> None:
|
|
"""Binary_sensor on_state_change: TriggerForwarder with two args."""
|
|
result = _build_forwarder(
|
|
"auto_1",
|
|
[("optional<bool>", "x_previous"), ("optional<bool>", "x")],
|
|
)
|
|
assert result == "TriggerForwarder<optional<bool>, optional<bool>>{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_string_arg() -> None:
|
|
"""Text_sensor on_value: TriggerForwarder<std::string>."""
|
|
result = _build_forwarder("auto_1", [("std::string", "x")])
|
|
assert result == "TriggerForwarder<std::string>{auto_1}"
|
|
|
|
|
|
def test_trigger_forwarder_custom_type() -> None:
|
|
"""Custom forwarder type passed directly."""
|
|
custom = MockObj("MyForwarder", "")
|
|
result = _build_forwarder("auto_1", [], forwarder=custom)
|
|
assert result == "MyForwarder{auto_1}"
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_build_callback() -> Generator[AsyncMock]:
|
|
"""Patch build_callback_automation to capture calls."""
|
|
with patch(
|
|
"esphome.automation.build_callback_automation", new_callable=AsyncMock
|
|
) as mock:
|
|
yield mock
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_empty_entries(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""No entries means no calls."""
|
|
parent = MockObj("var", "->")
|
|
await build_callback_automations(parent, {}, ())
|
|
mock_build_callback.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_missing_config_key(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Entry present but config key missing -- no calls."""
|
|
parent = MockObj("var", "->")
|
|
await build_callback_automations(
|
|
parent,
|
|
{},
|
|
(CallbackAutomation("on_state", "add_on_state_callback", [(bool, "x")]),),
|
|
)
|
|
mock_build_callback.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_single_entry(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Single entry with one config triggers one call."""
|
|
parent = MockObj("var", "->")
|
|
conf: dict[str, object] = {"automation_id": "auto_1", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {"on_state": [conf]}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(CallbackAutomation("on_state", "add_on_state_callback", [(bool, "x")]),),
|
|
)
|
|
mock_build_callback.assert_called_once_with(
|
|
parent, "add_on_state_callback", [(bool, "x")], conf, forwarder=None
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_multiple_configs(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Single entry with multiple configs triggers multiple calls."""
|
|
parent = MockObj("var", "->")
|
|
conf1: dict[str, object] = {"automation_id": "auto_1", "then": []}
|
|
conf2: dict[str, object] = {"automation_id": "auto_2", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {"on_state": [conf1, conf2]}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(CallbackAutomation("on_state", "add_on_state_callback", [(bool, "x")]),),
|
|
)
|
|
assert mock_build_callback.call_count == 2
|
|
mock_build_callback.assert_any_call(
|
|
parent, "add_on_state_callback", [(bool, "x")], conf1, forwarder=None
|
|
)
|
|
mock_build_callback.assert_any_call(
|
|
parent, "add_on_state_callback", [(bool, "x")], conf2, forwarder=None
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_multiple_entries(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Multiple entries each with one config."""
|
|
parent = MockObj("var", "->")
|
|
conf_a: dict[str, object] = {"automation_id": "auto_a", "then": []}
|
|
conf_b: dict[str, object] = {"automation_id": "auto_b", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {
|
|
"on_value": [conf_a],
|
|
"on_raw_value": [conf_b],
|
|
}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(
|
|
CallbackAutomation("on_value", "add_on_value_callback", [(float, "x")]),
|
|
CallbackAutomation(
|
|
"on_raw_value", "add_on_raw_value_callback", [(float, "x")]
|
|
),
|
|
),
|
|
)
|
|
assert mock_build_callback.call_count == 2
|
|
assert mock_build_callback.call_args_list == [
|
|
call(parent, "add_on_value_callback", [(float, "x")], conf_a, forwarder=None),
|
|
call(
|
|
parent, "add_on_raw_value_callback", [(float, "x")], conf_b, forwarder=None
|
|
),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_with_forwarder(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Entry with forwarder passes it through."""
|
|
parent = MockObj("var", "->")
|
|
conf: dict[str, object] = {"automation_id": "auto_1", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {"on_press": [conf]}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(
|
|
CallbackAutomation(
|
|
"on_press", "add_on_state_callback", forwarder=TriggerOnTrueForwarder
|
|
),
|
|
),
|
|
)
|
|
mock_build_callback.assert_called_once_with(
|
|
parent, "add_on_state_callback", [], conf, forwarder=TriggerOnTrueForwarder
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_mixed_entries(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Mix of entries with args, forwarders, and defaults."""
|
|
parent = MockObj("var", "->")
|
|
conf_state: dict[str, object] = {"automation_id": "auto_1", "then": []}
|
|
conf_press: dict[str, object] = {"automation_id": "auto_2", "then": []}
|
|
conf_release: dict[str, object] = {"automation_id": "auto_3", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {
|
|
"on_state": [conf_state],
|
|
"on_press": [conf_press],
|
|
"on_release": [conf_release],
|
|
}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(
|
|
CallbackAutomation("on_state", "add_on_state_callback", [(bool, "x")]),
|
|
CallbackAutomation(
|
|
"on_press", "add_on_state_callback", forwarder=TriggerOnTrueForwarder
|
|
),
|
|
CallbackAutomation(
|
|
"on_release", "add_on_state_callback", forwarder=TriggerOnFalseForwarder
|
|
),
|
|
),
|
|
)
|
|
assert mock_build_callback.call_count == 3
|
|
assert mock_build_callback.call_args_list == [
|
|
call(
|
|
parent, "add_on_state_callback", [(bool, "x")], conf_state, forwarder=None
|
|
),
|
|
call(
|
|
parent,
|
|
"add_on_state_callback",
|
|
[],
|
|
conf_press,
|
|
forwarder=TriggerOnTrueForwarder,
|
|
),
|
|
call(
|
|
parent,
|
|
"add_on_state_callback",
|
|
[],
|
|
conf_release,
|
|
forwarder=TriggerOnFalseForwarder,
|
|
),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_skips_missing_keys(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Entries whose config keys are absent are silently skipped."""
|
|
parent = MockObj("var", "->")
|
|
conf: dict[str, object] = {"automation_id": "auto_1", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {"on_press": [conf]}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(
|
|
CallbackAutomation(
|
|
"on_press", "add_on_state_callback", forwarder=TriggerOnTrueForwarder
|
|
),
|
|
CallbackAutomation(
|
|
"on_release", "add_on_state_callback", forwarder=TriggerOnFalseForwarder
|
|
),
|
|
),
|
|
)
|
|
mock_build_callback.assert_called_once_with(
|
|
parent, "add_on_state_callback", [], conf, forwarder=TriggerOnTrueForwarder
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_callback_automations_defaults(
|
|
mock_build_callback: AsyncMock,
|
|
) -> None:
|
|
"""Verify CallbackAutomation with only required fields defaults args=[] and forwarder=None."""
|
|
parent = MockObj("var", "->")
|
|
conf: dict[str, object] = {"automation_id": "auto_1", "then": []}
|
|
config: dict[str, list[dict[str, object]]] = {"on_press": [conf]}
|
|
await build_callback_automations(
|
|
parent,
|
|
config,
|
|
(CallbackAutomation("on_press", "add_on_press_callback"),),
|
|
)
|
|
mock_build_callback.assert_called_once_with(
|
|
parent, "add_on_press_callback", [], conf, forwarder=None
|
|
)
|