[motion] Implement hub component for IMUs (#16226)

Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: Jonathan Swoboda <154711427+swoboda1337@users.noreply.github.com>
This commit is contained in:
Clyde Stubbs
2026-06-06 18:10:08 +10:00
committed by GitHub
parent 6996b7ed1c
commit 745db9f705
7 changed files with 1597 additions and 0 deletions

View File

@@ -351,6 +351,7 @@ esphome/components/modbus_server/* @exciton
esphome/components/mopeka_ble/* @Fabian-Schmidt @spbrogan
esphome/components/mopeka_pro_check/* @spbrogan
esphome/components/mopeka_std_check/* @Fabian-Schmidt
esphome/components/motion/* @esphome/core
esphome/components/mpl3115a2/* @kbickar
esphome/components/mpu6886/* @fabaff
esphome/components/ms8607/* @e28eta

View File

@@ -0,0 +1,221 @@
from collections.abc import Callable
import re
from esphome import automation
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_ON_ERROR, CONF_ON_SUCCESS
from esphome.cpp_generator import MockObj, MockObjClass
from esphome.helpers import fnv1_hash_object_id
CODEOWNERS = ["@esphome/core"]
DOMAIN = "motion"
IS_PLATFORM_COMPONENT = True
# C++ namespace / class
motion_ns = cg.esphome_ns.namespace("motion")
MotionComponent = motion_ns.class_("MotionComponent", cg.PollingComponent)
AXES = ["x", "y", "z"]
CONF_AXIS_MAP = "axis_map"
CONF_MOTION_ID = "motion_id"
CONF_TRANSFORM_MATRIX = "transform_matrix"
CalibrateLevelAction = motion_ns.class_("CalibrateLevelAction", automation.Action)
CalibrateHeadingAction = motion_ns.class_("CalibrateHeadingAction", automation.Action)
ClearCalibrationAction = motion_ns.class_("ClearCalibrationAction", automation.Action)
KEY_ACCELEROMETER = "accelerometer"
KEY_GYROSCOPE = "gyroscope"
SENSOR_SCHEMA = cv.Schema(
{
cv.GenerateID(CONF_MOTION_ID): cv.use_id(MotionComponent),
}
)
_AXIS_REGEX = re.compile(r"^[+-]?[xyz]$", re.IGNORECASE)
def _axis_map(config: dict) -> dict:
errors = []
for key, axis in config.items():
if _AXIS_REGEX.fullmatch(axis) is None:
errors.append(
cv.Invalid(
"Each 'axis_map' config value must be one of 'x', 'y' or 'z' (optionally preceded by '+' or '-').",
path=[key],
)
)
values = {x.lower().removeprefix("-").removeprefix("+") for x in config.values()}
if values != set(AXES):
errors.append(cv.Invalid("Each axis may be mapped only once"))
if errors:
raise cv.MultipleInvalid(errors)
return config
def _axis_map_to_matrix(config: dict[str, str]) -> list[float]:
matrix = []
for target_axis in AXES:
source_axis = config[target_axis].lower()
sign = -1.0 if source_axis.startswith("-") else 1.0
source_axis = source_axis.removeprefix("+").removeprefix("-")
row = [0.0, 0.0, 0.0]
row[AXES.index(source_axis)] = sign
matrix.extend(row)
return matrix
def _transform_matrix(value):
"""Accept a flat list of 9 floats or a 3x3 nested list."""
if not isinstance(value, list) or len(value) == 0:
raise cv.Invalid("Expected a list of 9 numbers or a 3x3 nested list")
# Nested 3x3
if isinstance(value[0], list):
if len(value) != 3:
raise cv.Invalid(f"3x3 matrix must have 3 rows, got {len(value)}")
flat = []
for i, row in enumerate(value):
if not isinstance(row, list) or len(row) != 3:
raise cv.Invalid("Each row must be a list of 3 numbers", path=[i])
flat.extend(cv.float_(v) for v in row)
return flat
# Flat list
if len(value) != 9:
raise cv.Invalid(f"Flat matrix must have exactly 9 values, got {len(value)}")
return [cv.float_(v) for v in value]
def _validate_matrix_options(config):
if CONF_AXIS_MAP in config and CONF_TRANSFORM_MATRIX in config:
raise cv.Invalid(
f"'{CONF_AXIS_MAP}' and '{CONF_TRANSFORM_MATRIX}' are mutually exclusive"
)
return config
# Top-level CONFIG_SCHEMA
_CONFIG_SCHEMA = (
cv.Schema(
{
cv.Optional(CONF_AXIS_MAP): cv.All(
{cv.Required(k): cv.string_strict for k in AXES},
_axis_map,
),
cv.Optional(CONF_TRANSFORM_MATRIX): _transform_matrix,
}
)
.extend(cv.polling_component_schema("250ms"))
.add_extra(_validate_matrix_options)
)
def _add_data(has_accel: bool, has_gyro: bool) -> Callable[[dict], dict]:
def validator(config):
config = config.copy()
config[KEY_ACCELEROMETER] = has_accel
config[KEY_GYROSCOPE] = has_gyro
return config
return validator
def motion_schema(class_: MockObjClass, has_accel: bool, has_gyro: bool) -> cv.Schema:
return _CONFIG_SCHEMA.extend(
{
cv.GenerateID(): cv.declare_id(class_),
}
).add_extra(_add_data(has_accel, has_gyro))
# Code generation
async def register_motion_component(var: MockObj, config) -> None:
await cg.register_component(var, config)
# Set preference key for NVS save/restore (based on component ID)
obj_id = config[CONF_ID].id
pref_hash = fnv1_hash_object_id(obj_id)
cg.add(var.set_calibration_key(pref_hash))
if axis_map := config.get(CONF_AXIS_MAP):
cg.add(var.set_matrix(_axis_map_to_matrix(axis_map)))
elif transform_matrix := config.get(CONF_TRANSFORM_MATRIX):
cg.add(var.set_matrix(transform_matrix))
async def new_motion_component(config: dict) -> MockObj:
var = cg.new_Pvariable(config[CONF_ID])
await register_motion_component(var, config)
return var
# --- Actions ---
CONF_SAVE = "save"
CALIBRATE_ACTION_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.use_id(MotionComponent),
cv.Optional(CONF_SAVE, default=False): cv.boolean,
cv.Optional(CONF_ON_SUCCESS): automation.validate_automation(single=True),
cv.Optional(CONF_ON_ERROR): automation.validate_automation(single=True),
}
)
async def _build_calibrate_action(config, action_id, template_arg, args):
parent = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, parent)
if config.get(CONF_SAVE):
cg.add(var.set_save(True))
if on_success := config.get(CONF_ON_SUCCESS):
await automation.build_automation(var.get_success_trigger(), [], on_success)
if on_error := config.get(CONF_ON_ERROR):
await automation.build_automation(var.get_error_trigger(), [], on_error)
return var
@automation.register_action(
"motion.calibrate_level",
CalibrateLevelAction,
CALIBRATE_ACTION_SCHEMA,
synchronous=True,
)
async def calibrate_level_to_code(config, action_id, template_arg, args):
return await _build_calibrate_action(config, action_id, template_arg, args)
@automation.register_action(
"motion.calibrate_heading",
CalibrateHeadingAction,
CALIBRATE_ACTION_SCHEMA,
synchronous=True,
)
async def calibrate_heading_to_code(config, action_id, template_arg, args):
return await _build_calibrate_action(config, action_id, template_arg, args)
CLEAR_ACTION_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.use_id(MotionComponent),
cv.Optional(CONF_SAVE, default=False): cv.boolean,
}
)
@automation.register_action(
"motion.clear_calibration",
ClearCalibrationAction,
CLEAR_ACTION_SCHEMA,
synchronous=True,
)
async def clear_calibration_to_code(config, action_id, template_arg, args):
parent = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, parent)
if config.get(CONF_SAVE):
cg.add(var.set_save(True))
return var

View File

@@ -0,0 +1,194 @@
#include "motion_component.h"
#include "esphome/core/log.h"
namespace esphome::motion {
static const char *const TAG = "motion";
static void log_matrix(const float m[9]) {
ESP_LOGCONFIG(TAG, " Calibration matrix:");
ESP_LOGCONFIG(TAG, " - [%9.6f, %9.6f, %9.6f]", m[0], m[1], m[2]);
ESP_LOGCONFIG(TAG, " - [%9.6f, %9.6f, %9.6f]", m[3], m[4], m[5]);
ESP_LOGCONFIG(TAG, " - [%9.6f, %9.6f, %9.6f]", m[6], m[7], m[8]);
}
// FNV-1a over the raw bytes of the matrix. Identical axis maps always yield
// bit-identical matrices, so this is a stable fingerprint of the build-time base.
static uint32_t hash_matrix(const float m[9]) {
const uint8_t *bytes = reinterpret_cast<const uint8_t *>(m);
uint32_t hash = 2166136261UL;
for (size_t i = 0; i < sizeof(float) * 9; i++) {
hash ^= bytes[i];
hash *= 16777619UL;
}
return hash;
}
void MotionComponent::setup() {
// matrix_ currently holds the build-time base (set_matrix ran during codegen).
this->base_hash_ = hash_matrix(this->base_matrix_);
this->pref_ = global_preferences->make_preference<CalibrationPref>(this->pref_key_);
CalibrationPref saved;
if (this->pref_.load(&saved) && saved.base_hash == this->base_hash_) {
memcpy(this->matrix_, saved.matrix, sizeof(this->matrix_));
ESP_LOGI(TAG, "Restored calibration from NVS");
} else {
ESP_LOGD(TAG, "No matching saved calibration; using build-time matrix");
}
log_matrix(this->matrix_);
}
void MotionComponent::dump_config() {
LOG_UPDATE_INTERVAL(this);
log_matrix(this->matrix_);
}
bool MotionComponent::save_calibration() {
if (this->pref_key_ == 0) {
ESP_LOGW(TAG, "Cannot save calibration: no preference key set");
return false;
}
CalibrationPref pref{this->base_hash_, {}};
memcpy(pref.matrix, this->matrix_, sizeof(pref.matrix));
if (this->pref_.save(&pref)) {
global_preferences->sync();
ESP_LOGI(TAG, "Saved calibration to NVS");
return true;
}
ESP_LOGW(TAG, "Calibration save failed");
return false;
}
void MotionComponent::clear_calibration() {
memcpy(this->matrix_, this->base_matrix_, sizeof(this->matrix_));
ESP_LOGI(TAG, "Calibration reset to build-time matrix");
log_matrix(this->matrix_);
}
void MotionComponent::update() {
if (this->is_failed())
return;
MotionData motion_data{};
MotionData raw_data{};
if (!this->update_data(raw_data))
return;
this->map_axes_(motion_data.acceleration, raw_data.acceleration);
this->map_axes_(motion_data.angular_rate, raw_data.angular_rate);
this->motion_data_callback_.call(motion_data);
ESP_LOGV(TAG, "Accel: [%.3f, %.3f, %.3f] g; Gyro: [%.3f, %.3f, %.3f] °/s", motion_data.acceleration[X_AXIS],
motion_data.acceleration[Y_AXIS], motion_data.acceleration[Z_AXIS], motion_data.angular_rate[X_AXIS],
motion_data.angular_rate[Y_AXIS], motion_data.angular_rate[Z_AXIS]);
}
bool MotionComponent::calibrate_level() {
MotionData raw{};
if (!this->update_data(raw)) {
ESP_LOGW(TAG, "calibrate_level: failed to read sensor data");
return false;
}
// Apply the current matrix first so any existing axis mapping is preserved.
float mapped[3];
this->map_axes_(mapped, raw.acceleration);
float nx = mapped[X_AXIS];
float ny = mapped[Y_AXIS];
float nz = mapped[Z_AXIS];
float mag = std::sqrt(nx * nx + ny * ny + nz * nz);
if (mag < 0.1f) {
ESP_LOGW(TAG, "calibrate_level: acceleration magnitude too small (%.3f)", mag);
return false;
}
// Normalize
nx /= mag;
ny /= mag;
nz /= mag;
// Compute rotation matrix R such that R * [nx, ny, nz] = [0, 0, 1]
// using Rodrigues' rotation formula, then compose with the existing matrix.
if (nz > 0.99999f) {
// Already aligned with +Z — nothing to compose
ESP_LOGI(TAG, "Level calibration: already aligned");
log_matrix(this->matrix_);
// returning true here will trigger on_success and a save to NVS, but the save will ultimately be a no-op
// since the backend sync will not write unchanged values.
return true;
}
float r[9];
if (nz < -0.9999f) {
// Aligned with -Z — 180° rotation about X
float m[9] = {1, 0, 0, 0, -1, 0, 0, 0, -1};
memcpy(r, m, sizeof(r));
} else {
float f = 1.0f / (1.0f + nz);
r[0] = 1.0f - nx * nx * f;
r[1] = -nx * ny * f;
r[2] = -nx;
r[3] = -nx * ny * f;
r[4] = 1.0f - ny * ny * f;
r[5] = -ny;
r[6] = nx;
r[7] = ny;
r[8] = nz;
}
// Compose: new_matrix = R * old_matrix
float old[9];
memcpy(old, this->matrix_, sizeof(old));
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
this->matrix_[i * 3 + j] = r[i * 3 + 0] * old[j] + r[i * 3 + 1] * old[3 + j] + r[i * 3 + 2] * old[6 + j];
}
}
ESP_LOGI(TAG, "Level calibration applied (mapped accel: [%.3f, %.3f, %.3f])", mapped[X_AXIS], mapped[Y_AXIS],
mapped[Z_AXIS]);
log_matrix(this->matrix_);
return true;
}
bool MotionComponent::calibrate_heading() {
MotionData raw{};
if (!this->update_data(raw)) {
ESP_LOGW(TAG, "calibrate_heading: failed to read sensor data");
return false;
}
// Apply current matrix to get the mapped acceleration
float mapped[3];
this->map_axes_(mapped, raw.acceleration);
float mx = mapped[X_AXIS];
float my = mapped[Y_AXIS];
float h = std::sqrt(mx * mx + my * my);
if (h < 0.05f) {
ESP_LOGW(TAG, "calibrate_heading: device must be tilted (XY magnitude %.3f too small)", h);
return false;
}
// Rotation angle in the XY plane: eliminate Y component while preserving X sign.
// Without the sign correction, atan2(my,mx) would rotate everything to +X,
// flipping the sign when the tilt projects onto -X.
float sign_mx = mx >= 0 ? 1.0f : -1.0f;
float cos_phi = sign_mx * mx / h; // = |mx| / h
float sin_phi = sign_mx * my / h;
// Compose Rz(-phi) with the current matrix
// Rz(-phi) = [[cos_phi, sin_phi, 0], [-sin_phi, cos_phi, 0], [0, 0, 1]]
float old[9];
memcpy(old, this->matrix_, sizeof(old));
this->matrix_[0] = cos_phi * old[0] + sin_phi * old[3];
this->matrix_[1] = cos_phi * old[1] + sin_phi * old[4];
this->matrix_[2] = cos_phi * old[2] + sin_phi * old[5];
this->matrix_[3] = -sin_phi * old[0] + cos_phi * old[3];
this->matrix_[4] = -sin_phi * old[1] + cos_phi * old[4];
this->matrix_[5] = -sin_phi * old[2] + cos_phi * old[5];
// Row 2 unchanged
ESP_LOGI(TAG, "Heading calibration applied (mapped accel: [%.3f, %.3f, %.3f])", mapped[X_AXIS], mapped[Y_AXIS],
mapped[Z_AXIS]);
log_matrix(this->matrix_);
return true;
}
} // namespace esphome::motion

View File

@@ -0,0 +1,154 @@
#pragma once
#include "esphome/core/automation.h"
#include "esphome/core/component.h"
#include "esphome/core/helpers.h"
#include "esphome/core/preferences.h"
#include <array>
#include <cmath>
#include <numbers> // required for generated lambda code
namespace esphome::motion {
// ---Data class
struct MotionData {
float acceleration[3]{NAN, NAN, NAN};
float angular_rate[3]{NAN, NAN, NAN};
// TODO - compass
};
// indices into data arrays
static constexpr uint8_t X_AXIS = 0;
static constexpr uint8_t Y_AXIS = 1;
static constexpr uint8_t Z_AXIS = 2;
// Persisted calibration. `base_hash` ties the stored matrix to the build-time
// (axis_map / transform_matrix) base; if the base changes the saved calibration
// is ignored. Stored under a stable, ID-derived key so it overwrites in place.
struct CalibrationPref {
uint32_t base_hash;
float matrix[9];
} PACKED;
// Main component class
class MotionComponent : public PollingComponent {
public:
// Lifecycle
void setup() override;
void update() override;
void dump_config() override;
float get_setup_priority() const override { return setup_priority::DATA; }
void set_matrix(const std::array<float, 9> &m) {
memcpy(this->base_matrix_, m.data(), sizeof(this->base_matrix_));
memcpy(this->matrix_, m.data(), sizeof(this->matrix_));
}
void set_calibration_key(uint32_t key) { this->pref_key_ = key; }
/// Calibrate the matrix so the current reading maps to [0, 0, 1] (device flat).
bool calibrate_level();
/// Assuming Y-axis rotation only, correct the heading so X/Y align correctly.
bool calibrate_heading();
/// Save the current matrix to NVS.
bool save_calibration();
/// Restore the build-time (axis_map / transform_matrix) base, discarding calibration.
void clear_calibration();
template<typename F> void add_listener(F &&cb) { this->motion_data_callback_.add(std::forward<F>(cb)); }
protected:
// platforms must implement this method to update raw data.
virtual bool update_data(MotionData &data) = 0;
// for mapping axes
float matrix_[9]{
1, 0, 0, 0, 1, 0, 0, 0, 1,
};
// build-time base (axis_map / transform_matrix); used to detect config changes
// and to restore on clear_calibration().
float base_matrix_[9]{
1, 0, 0, 0, 1, 0, 0, 0, 1,
};
void map_axes_(float output[3], const float input[3]) const {
output[0] = input[X_AXIS] * this->matrix_[0] + input[Y_AXIS] * this->matrix_[1] + input[Z_AXIS] * this->matrix_[2];
output[1] = input[X_AXIS] * this->matrix_[3] + input[Y_AXIS] * this->matrix_[4] + input[Z_AXIS] * this->matrix_[5];
output[2] = input[X_AXIS] * this->matrix_[6] + input[Y_AXIS] * this->matrix_[7] + input[Z_AXIS] * this->matrix_[8];
}
LazyCallbackManager<void(MotionData &)> motion_data_callback_{};
uint32_t pref_key_{0};
uint32_t base_hash_{0}; // hash of base_matrix_, captured in setup()
ESPPreferenceObject pref_{};
};
// --- Actions ---
template<typename... Ts> class CalibrateLevelAction : public Action<Ts...> {
public:
explicit CalibrateLevelAction(MotionComponent *parent) : parent_(parent) {}
void set_save(bool save) { this->save_ = save; }
Trigger<> *get_success_trigger() { return &this->success_trigger_; }
Trigger<> *get_error_trigger() { return &this->error_trigger_; }
protected:
void play(const Ts &...) override {
if (this->parent_->calibrate_level()) {
// if not saving, calibration success is enough. If save required only report success after that succeeds too.
if (!this->save_ || this->parent_->save_calibration()) {
this->success_trigger_.trigger();
return;
}
}
this->error_trigger_.trigger();
}
MotionComponent *parent_;
Trigger<> success_trigger_;
Trigger<> error_trigger_;
bool save_{false};
};
template<typename... Ts> class CalibrateHeadingAction : public Action<Ts...> {
public:
explicit CalibrateHeadingAction(MotionComponent *parent) : parent_(parent) {}
void set_save(bool save) { this->save_ = save; }
Trigger<> *get_success_trigger() { return &this->success_trigger_; }
Trigger<> *get_error_trigger() { return &this->error_trigger_; }
protected:
void play(const Ts &...) override {
if (this->parent_->calibrate_heading()) {
// if not saving, calibration success is enough. If save required only report success after that succeeds too.
if (!this->save_ || this->parent_->save_calibration()) {
this->success_trigger_.trigger();
return;
}
}
this->error_trigger_.trigger();
}
MotionComponent *parent_;
Trigger<> success_trigger_;
Trigger<> error_trigger_;
bool save_{false};
};
template<typename... Ts> class ClearCalibrationAction : public Action<Ts...> {
public:
explicit ClearCalibrationAction(MotionComponent *parent) : parent_(parent) {}
void set_save(bool save) { this->save_ = save; }
protected:
void play(const Ts &...) override {
this->parent_->clear_calibration();
if (this->save_)
this->parent_->save_calibration();
}
MotionComponent *parent_;
bool save_{false};
};
} // namespace esphome::motion

View File

@@ -0,0 +1,128 @@
# YAML config keys
import esphome.codegen as cg
from esphome.components import sensor
import esphome.config_validation as cv
from esphome.const import (
CONF_TYPE,
ICON_ACCELERATION,
ICON_ROTATE_RIGHT,
STATE_CLASS_MEASUREMENT,
UNIT_DEGREE_PER_SECOND,
UNIT_DEGREES,
UNIT_G,
)
from esphome.cpp_generator import MockObj
from esphome.cpp_types import std_ns
import esphome.final_validate as fv
from . import (
AXES,
CONF_MOTION_ID,
KEY_ACCELEROMETER,
KEY_GYROSCOPE,
SENSOR_SCHEMA,
motion_ns,
)
MotionData = motion_ns.class_("MotionData")
CONF_PITCH = "pitch"
CONF_ROLL = "roll"
ICON_SEESAW = "mdi:seesaw"
def _accel_sensor_schema():
return sensor.sensor_schema(
unit_of_measurement=UNIT_G,
icon=ICON_ACCELERATION,
accuracy_decimals=2,
state_class=STATE_CLASS_MEASUREMENT,
).extend(SENSOR_SCHEMA)
def _gyro_sensor_schema():
return sensor.sensor_schema(
unit_of_measurement=UNIT_DEGREE_PER_SECOND,
icon=ICON_ROTATE_RIGHT,
accuracy_decimals=2,
state_class=STATE_CLASS_MEASUREMENT,
).extend(SENSOR_SCHEMA)
def _level_sensor_schema():
return sensor.sensor_schema(
unit_of_measurement=UNIT_DEGREES,
icon=ICON_SEESAW,
accuracy_decimals=2,
state_class=STATE_CLASS_MEASUREMENT,
).extend(SENSOR_SCHEMA)
_ACCELERATIONS = ["acceleration_" + a for a in AXES]
_GYROSCOPES = ["gyroscope_" + g for g in AXES]
_ANGULAR_RATES = ["angular_rate_" + r for r in AXES]
CONFIG_SCHEMA = cv.typed_schema(
{
**{x: _accel_sensor_schema() for x in _ACCELERATIONS},
**{x: _gyro_sensor_schema() for x in _GYROSCOPES},
**{x: _gyro_sensor_schema() for x in _ANGULAR_RATES},
**{x: _level_sensor_schema() for x in (CONF_PITCH, CONF_ROLL)},
}
)
def _final_validate(config: dict) -> None:
full_config = fv.full_config.get()
motion_path = full_config.get_path_for_id(config[CONF_MOTION_ID])[:-1]
motion_config = full_config.get_config_for_path(motion_path)
has_accel = motion_config.get(KEY_ACCELEROMETER, False)
has_gyro = motion_config.get(KEY_GYROSCOPE, False)
sensor_type = config[CONF_TYPE]
if (
sensor_type in _ACCELERATIONS or sensor_type in (CONF_ROLL, CONF_PITCH)
) and not has_accel:
raise cv.Invalid(
"The motion device does not measure acceleration", path=[CONF_TYPE]
)
if (sensor_type in _GYROSCOPES or sensor_type in _ANGULAR_RATES) and not has_gyro:
raise cv.Invalid(
"The motion device does not measure angular rate", path=[CONF_TYPE]
)
FINAL_VALIDATE_SCHEMA = _final_validate
def build_sensor_expr(sensor_type: str, data: MockObj) -> MockObj:
"""Build the C++ expression for a motion sensor type."""
# Note that <numbers> is included via this component's header file.
pif = std_ns.namespace("numbers").pi_v.template(cg.float_)
if sensor_type == CONF_ROLL:
ay = data.acceleration[1]
az = data.acceleration[2]
return std_ns.atan2(ay, az) * (180.0 / pif)
if sensor_type == CONF_PITCH:
ax = data.acceleration[0]
ay = data.acceleration[1]
az = data.acceleration[2]
return std_ns.atan2(-ax, std_ns.sqrt(ay * ay + az * az)) * (180.0 / pif)
sensor_offset = AXES.index(sensor_type[-1:])
if sensor_type in _GYROSCOPES:
sensor_type = _ANGULAR_RATES[sensor_offset]
return getattr(data, str(sensor_type[:-2]))[sensor_offset]
async def to_code(config):
sensor_type = config[CONF_TYPE]
var = await sensor.new_sensor(config)
parent = await cg.get_variable(config[CONF_MOTION_ID])
data = MockObj("data")
expr = build_sensor_expr(sensor_type, data)
value_lambda = await cg.process_lambda(
var.publish_state(expr),
[(MotionData.operator("ref"), str(data))],
)
cg.add(parent.add_listener(value_lambda))

View File

View File

@@ -0,0 +1,899 @@
"""Tests for the motion component."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from voluptuous import Invalid, MultipleInvalid
from esphome.components.motion import (
CALIBRATE_ACTION_SCHEMA,
CLEAR_ACTION_SCHEMA,
CONF_AXIS_MAP,
CONF_SAVE,
CONF_TRANSFORM_MATRIX,
_axis_map,
_axis_map_to_matrix,
_build_calibrate_action,
_transform_matrix,
_validate_matrix_options,
clear_calibration_to_code,
)
from esphome.components.motion.sensor import (
_ACCELERATIONS,
_ANGULAR_RATES,
_GYROSCOPES,
CONF_PITCH,
CONF_ROLL,
CONFIG_SCHEMA,
build_sensor_expr,
)
from esphome.const import CONF_ID, CONF_ON_ERROR, CONF_ON_SUCCESS
from esphome.cpp_generator import MockObj
# --- Axis map validation ---
class TestAxisMapValidation:
"""Tests for the _axis_map validator."""
def test_identity_map(self):
result = _axis_map({"x": "x", "y": "y", "z": "z"})
assert result == {"x": "x", "y": "y", "z": "z"}
def test_axis_swap(self):
result = _axis_map({"x": "y", "y": "z", "z": "x"})
assert result == {"x": "y", "y": "z", "z": "x"}
def test_negation(self):
result = _axis_map({"x": "-y", "y": "z", "z": "x"})
assert result == {"x": "-y", "y": "z", "z": "x"}
def test_plus_prefix(self):
result = _axis_map({"x": "+y", "y": "z", "z": "x"})
assert result == {"x": "+y", "y": "z", "z": "x"}
def test_case_insensitive(self):
result = _axis_map({"x": "X", "y": "Y", "z": "Z"})
assert result == {"x": "X", "y": "Y", "z": "Z"}
def test_invalid_axis_value(self):
with pytest.raises(MultipleInvalid):
_axis_map({"x": "a", "y": "y", "z": "z"})
def test_duplicate_mapping(self):
with pytest.raises(MultipleInvalid):
_axis_map({"x": "x", "y": "x", "z": "z"})
def test_all_same_axis(self):
with pytest.raises(MultipleInvalid):
_axis_map({"x": "x", "y": "x", "z": "x"})
def test_empty_value(self):
with pytest.raises(MultipleInvalid):
_axis_map({"x": "", "y": "y", "z": "z"})
def test_invalid_and_duplicate(self):
"""Both invalid value and duplicate should produce multiple errors."""
with pytest.raises(MultipleInvalid) as exc_info:
_axis_map({"x": "a", "y": "x", "z": "z"})
# Should have at least the invalid regex error and the duplicate error
assert len(exc_info.value.errors) >= 2
# --- Transform matrix validation ---
class TestTransformMatrix:
"""Tests for the _transform_matrix validator."""
def test_flat_identity(self):
result = _transform_matrix([1, 0, 0, 0, 1, 0, 0, 0, 1])
assert result == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]
def test_flat_values_converted_to_float(self):
result = _transform_matrix([1, 2, 3, 4, 5, 6, 7, 8, 9])
assert all(isinstance(v, float) for v in result)
def test_nested_3x3(self):
result = _transform_matrix([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
assert result == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]
def test_nested_3x3_values(self):
result = _transform_matrix(
[[0.5, 0.1, -0.2], [-0.1, 0.9, 0.3], [0.2, -0.3, 0.8]]
)
assert len(result) == 9
assert result[0] == pytest.approx(0.5)
assert result[3] == pytest.approx(-0.1)
assert result[8] == pytest.approx(0.8)
def test_flat_wrong_length_short(self):
with pytest.raises(Invalid, match="exactly 9"):
_transform_matrix([1, 0, 0])
def test_flat_wrong_length_long(self):
with pytest.raises(Invalid, match="exactly 9"):
_transform_matrix([1] * 12)
def test_nested_wrong_row_count(self):
with pytest.raises(Invalid, match="3 rows"):
_transform_matrix([[1, 0, 0], [0, 1, 0]])
def test_nested_wrong_column_count(self):
with pytest.raises(Invalid, match="3 numbers"):
_transform_matrix([[1, 0], [0, 1, 0], [0, 0, 1]])
def test_empty_list(self):
with pytest.raises(Invalid):
_transform_matrix([])
def test_not_a_list(self):
with pytest.raises(Invalid):
_transform_matrix("identity")
class TestValidateMatrixOptions:
"""Tests for mutual exclusivity of axis_map and transform_matrix."""
def test_neither_passes(self):
config = {"some_key": "value"}
assert _validate_matrix_options(config) is config
def test_axis_map_only_passes(self):
config = {CONF_AXIS_MAP: {"x": "x", "y": "y", "z": "z"}}
assert _validate_matrix_options(config) is config
def test_transform_matrix_only_passes(self):
config = {CONF_TRANSFORM_MATRIX: [1, 0, 0, 0, 1, 0, 0, 0, 1]}
assert _validate_matrix_options(config) is config
def test_both_raises(self):
config = {
CONF_AXIS_MAP: {"x": "x", "y": "y", "z": "z"},
CONF_TRANSFORM_MATRIX: [1, 0, 0, 0, 1, 0, 0, 0, 1],
}
with pytest.raises(Invalid, match="mutually exclusive"):
_validate_matrix_options(config)
# --- Axis map to matrix ---
class TestAxisMapToMatrix:
"""Tests for _axis_map_to_matrix conversion."""
def test_identity(self):
assert _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"}) == [
1,
0,
0,
0,
1,
0,
0,
0,
1,
]
def test_swap_xy(self):
# x←y, y←x, z←z
assert _axis_map_to_matrix({"x": "y", "y": "x", "z": "z"}) == [
0,
1,
0,
1,
0,
0,
0,
0,
1,
]
def test_rotate_xyz(self):
# x←y, y←z, z←x
assert _axis_map_to_matrix({"x": "y", "y": "z", "z": "x"}) == [
0,
1,
0,
0,
0,
1,
1,
0,
0,
]
def test_negate_x(self):
assert _axis_map_to_matrix({"x": "-x", "y": "y", "z": "z"}) == [
-1,
0,
0,
0,
1,
0,
0,
0,
1,
]
def test_negate_z(self):
assert _axis_map_to_matrix({"x": "x", "y": "y", "z": "-z"}) == [
1,
0,
0,
0,
1,
0,
0,
0,
-1,
]
def test_swap_and_negate(self):
# x←-y, y←z, z←x
assert _axis_map_to_matrix({"x": "-y", "y": "z", "z": "x"}) == [
0,
-1,
0,
0,
0,
1,
1,
0,
0,
]
def test_plus_prefix_ignored(self):
assert _axis_map_to_matrix({"x": "+y", "y": "z", "z": "x"}) == [
0,
1,
0,
0,
0,
1,
1,
0,
0,
]
# --- Sensor expression generation ---
def _expr_str(sensor_type: str) -> str:
"""Build a sensor expression via the production function and return its string form."""
return str(build_sensor_expr(sensor_type, MockObj("data")))
class TestSensorExpressions:
"""Tests that sensor code generation produces correct C++ expressions."""
@pytest.mark.parametrize(
"sensor_type,expected_index",
[
("acceleration_x", 0),
("acceleration_y", 1),
("acceleration_z", 2),
],
)
def test_acceleration_sensors(self, sensor_type, expected_index):
assert _expr_str(sensor_type) == f"data.acceleration[{expected_index}]"
@pytest.mark.parametrize(
"sensor_type,expected_index",
[
("angular_rate_x", 0),
("angular_rate_y", 1),
("angular_rate_z", 2),
],
)
def test_angular_rate_sensors(self, sensor_type, expected_index):
assert _expr_str(sensor_type) == f"data.angular_rate[{expected_index}]"
@pytest.mark.parametrize(
"sensor_type,expected_index",
[
("gyroscope_x", 0),
("gyroscope_y", 1),
("gyroscope_z", 2),
],
)
def test_gyroscope_maps_to_angular_rate(self, sensor_type, expected_index):
"""Gyroscope sensor types should be remapped to angular_rate in the expression."""
assert _expr_str(sensor_type) == f"data.angular_rate[{expected_index}]"
def test_roll_expression(self):
expr = _expr_str("roll")
assert "std::atan2" in expr
assert "data.acceleration[1]" in expr
assert "data.acceleration[2]" in expr
assert "180.0f" in expr
assert "std::numbers::pi_v<float>" in expr
# Roll should NOT reference acceleration[0]
assert "data.acceleration[0]" not in expr
def test_pitch_expression(self):
expr = _expr_str("pitch")
assert "std::atan2" in expr
assert "std::sqrt" in expr
# All three axes used
assert "data.acceleration[0]" in expr
assert "data.acceleration[1]" in expr
assert "data.acceleration[2]" in expr
assert "180.0f" in expr
assert "std::numbers::pi_v<float>" in expr
# Pitch negates the x component
assert "(-data.acceleration[0])" in expr
# --- Calibration math ---
#
# Pure-Python reimplementation of the C++ calibration algorithms so we can
# verify the mathematical properties without needing to compile C++.
def _mat_vec(m: list[float], v: list[float]) -> list[float]:
"""Multiply a row-major 3x3 matrix by a 3-vector."""
return [
m[0] * v[0] + m[1] * v[1] + m[2] * v[2],
m[3] * v[0] + m[4] * v[1] + m[5] * v[2],
m[6] * v[0] + m[7] * v[1] + m[8] * v[2],
]
def _mat_mul(a: list[float], b: list[float]) -> list[float]:
"""Multiply two row-major 3x3 matrices."""
r = [0.0] * 9
for i in range(3):
for j in range(3):
r[i * 3 + j] = sum(a[i * 3 + k] * b[k * 3 + j] for k in range(3))
return r
def _transpose(m: list[float]) -> list[float]:
"""Transpose a row-major 3x3 matrix."""
return [m[0], m[3], m[6], m[1], m[4], m[7], m[2], m[5], m[8]]
def _det(m: list[float]) -> float:
"""Determinant of a 3x3 matrix."""
return (
m[0] * (m[4] * m[8] - m[5] * m[7])
- m[1] * (m[3] * m[8] - m[5] * m[6])
+ m[2] * (m[3] * m[7] - m[4] * m[6])
)
def _calibrate_level(
raw: list[float], matrix: list[float] | None = None
) -> list[float]:
"""Python port of MotionComponent::calibrate_level.
Composes the correction with *matrix* (defaults to identity).
"""
import math
if matrix is None:
matrix = list(IDENTITY)
# Apply current matrix first
mapped = _mat_vec(matrix, raw)
nx, ny, nz = mapped
mag = math.sqrt(nx * nx + ny * ny + nz * nz)
nx /= mag
ny /= mag
nz /= mag
if nz > 0.9999:
return matrix[:] # already aligned, preserve existing matrix
if nz < -0.9999:
r = [1, 0, 0, 0, -1, 0, 0, 0, -1]
else:
f = 1.0 / (1.0 + nz)
r = [
1.0 - nx * nx * f,
-nx * ny * f,
-nx,
-nx * ny * f,
1.0 - ny * ny * f,
-ny,
nx,
ny,
nz,
]
return _mat_mul(r, matrix)
def _calibrate_heading(matrix: list[float], raw: list[float]) -> list[float]:
"""Python port of MotionComponent::calibrate_heading."""
import math
mapped = _mat_vec(matrix, raw)
mx, my = mapped[0], mapped[1]
h = math.sqrt(mx * mx + my * my)
sign_mx = 1.0 if mx >= 0 else -1.0
cos_phi = sign_mx * mx / h # = |mx| / h
sin_phi = sign_mx * my / h
old = matrix[:]
new = old[:]
new[0] = cos_phi * old[0] + sin_phi * old[3]
new[1] = cos_phi * old[1] + sin_phi * old[4]
new[2] = cos_phi * old[2] + sin_phi * old[5]
new[3] = -sin_phi * old[0] + cos_phi * old[3]
new[4] = -sin_phi * old[1] + cos_phi * old[4]
new[5] = -sin_phi * old[2] + cos_phi * old[5]
return new
IDENTITY = [1, 0, 0, 0, 1, 0, 0, 0, 1]
class TestCalibrateLevel:
"""Verify the Rodrigues-based level calibration matrix."""
def _assert_maps_to_z(self, raw: list[float]) -> list[float]:
"""Assert that the calibration matrix maps raw to [0, 0, 1]."""
import math
m = _calibrate_level(raw)
mag = math.sqrt(sum(v * v for v in raw))
norm = [v / mag for v in raw]
result = _mat_vec(m, norm)
assert result[0] == pytest.approx(0, abs=1e-6)
assert result[1] == pytest.approx(0, abs=1e-6)
assert result[2] == pytest.approx(1, abs=1e-6)
return m
def test_already_flat(self):
m = _calibrate_level([0, 0, 1.0])
assert m == IDENTITY
def test_preserves_existing_matrix_when_flat(self):
"""If already flat after axis mapping, level cal should not change the matrix."""
swap = [0, 1, 0, 1, 0, 0, 0, 0, 1] # swap X↔Y
m = _calibrate_level([0, 0, 1.0], swap)
assert m == swap
def test_composes_with_existing_matrix(self):
"""Level calibration should correct tilt while preserving an existing axis swap."""
import math
swap = [0, 1, 0, 1, 0, 0, 0, 0, 1] # swap X↔Y
# Tilted raw: gravity has X component in raw frame
raw = [0.3, 0.0, 0.954]
m = _calibrate_level(raw, swap)
# After calibration, current raw should map to [0, 0, ~1]
mag = math.sqrt(sum(v * v for v in raw))
norm = [v / mag for v in raw]
result = _mat_vec(m, norm)
assert result[0] == pytest.approx(0, abs=1e-5)
assert result[1] == pytest.approx(0, abs=1e-5)
assert result[2] == pytest.approx(1, abs=1e-5)
# Result should differ from calibrating without the swap
m_no_swap = _calibrate_level(raw)
assert m != m_no_swap
def test_upside_down(self):
m = _calibrate_level([0, 0, -1.0])
# 180° about X
assert m == [1, 0, 0, 0, -1, 0, 0, 0, -1]
result = _mat_vec(m, [0, 0, -1])
assert result[2] == pytest.approx(1, abs=1e-6)
def test_gravity_along_x(self):
self._assert_maps_to_z([1.0, 0, 0])
def test_gravity_along_neg_x(self):
self._assert_maps_to_z([-1.0, 0, 0])
def test_gravity_along_y(self):
self._assert_maps_to_z([0, 1.0, 0])
def test_tilted_45_degrees(self):
import math
self._assert_maps_to_z(
[math.sin(math.radians(45)), 0, math.cos(math.radians(45))]
)
def test_arbitrary_vector(self):
self._assert_maps_to_z([0.3, -0.5, 0.81])
def test_unnormalized_input(self):
"""Input does not need to be unit length."""
self._assert_maps_to_z([0.6, -1.0, 1.62])
@pytest.mark.parametrize(
"raw",
[
[1.0, 0, 0],
[0, 1.0, 0],
[0.3, -0.5, 0.81],
[-0.7, 0.4, 0.59],
],
)
def test_result_is_proper_rotation(self, raw):
"""The resulting matrix should be orthogonal with determinant +1."""
m = _calibrate_level(raw)
# R^T * R ≈ I
product = _mat_mul(_transpose(m), m)
for i in range(9):
expected = 1.0 if i % 4 == 0 else 0.0
assert product[i] == pytest.approx(expected, abs=1e-6)
# det ≈ 1
assert _det(m) == pytest.approx(1.0, abs=1e-6)
class TestCalibrateHeading:
"""Verify the Z-rotation heading correction."""
def test_y_axis_tilt_no_heading_error(self):
"""Device tilted purely around Y — heading should already be correct."""
import math
flat_raw = [0, 0, 1.0]
level_m = _calibrate_level(flat_raw)
# Tilt 30° around Y: gravity = [-sin30, 0, cos30]
tilted_raw = [-math.sin(math.radians(30)), 0, math.cos(math.radians(30))]
heading_m = _calibrate_heading(level_m, tilted_raw)
# Matrix should barely change since there's no Y component
for i in range(9):
assert heading_m[i] == pytest.approx(level_m[i], abs=1e-6)
def test_corrects_heading_rotation(self):
"""After level+heading calibration, mapped Y should be ~0 when tilted."""
import math
# Simulate a sensor whose chip is rotated 30° around Z relative to enclosure
angle = math.radians(30)
# When the enclosure is flat, the raw reading is [0, 0, 1] regardless of Z rotation
level_m = _calibrate_level([0, 0, 1.0])
# When tilted around the enclosure's Y axis, the raw reading in the
# chip frame has both X and Y components due to the Z-rotation offset
tilt = math.radians(20)
# In enclosure frame: [-sin(tilt), 0, cos(tilt)]
# Rotated by Z-angle into chip frame:
ex = -math.sin(tilt) * math.cos(angle)
ey = -math.sin(tilt) * math.sin(angle)
ez = math.cos(tilt)
tilted_raw = [ex, ey, ez]
heading_m = _calibrate_heading(level_m, tilted_raw)
# After correction, mapped Y should be 0
result = _mat_vec(heading_m, tilted_raw)
assert result[1] == pytest.approx(0, abs=1e-6)
# Z should still be correct
assert result[2] == pytest.approx(math.cos(tilt), abs=1e-6)
def test_full_calibration_sequence(self):
"""End-to-end: level then heading produces correct frame alignment."""
import math
# Chip is mounted tilted 15° around Y and 25° around Z
# Build the chip-to-enclosure rotation: Rz(25°) * Ry(15°)
yz = math.radians(25)
yy = math.radians(15)
# Ry(yy)
ry = [
math.cos(yy),
0,
math.sin(yy),
0,
1,
0,
-math.sin(yy),
0,
math.cos(yy),
]
# Rz(yz)
rz = [
math.cos(yz),
-math.sin(yz),
0,
math.sin(yz),
math.cos(yz),
0,
0,
0,
1,
]
chip_rot = _mat_mul(rz, ry) # chip orientation in enclosure frame
# Inverse (transpose) maps enclosure vectors to chip readings
chip_rot_inv = _transpose(chip_rot)
# Step 1: Device flat — gravity in enclosure frame is [0, 0, 1]
flat_raw = _mat_vec(chip_rot_inv, [0, 0, 1])
level_m = _calibrate_level(flat_raw)
# After level calibration, flat reading should map to [0, 0, 1]
check_flat = _mat_vec(level_m, flat_raw)
assert check_flat[0] == pytest.approx(0, abs=1e-5)
assert check_flat[1] == pytest.approx(0, abs=1e-5)
assert check_flat[2] == pytest.approx(1, abs=1e-5)
# Step 2: Tilt enclosure around Y by 20°
tilt = math.radians(20)
tilted_enclosure = [-math.sin(tilt), 0, math.cos(tilt)]
tilted_raw = _mat_vec(chip_rot_inv, tilted_enclosure)
heading_m = _calibrate_heading(level_m, tilted_raw)
# After heading calibration, the mapped reading should be
# [-sin(tilt), 0, cos(tilt)] — all horizontal component in X
result = _mat_vec(heading_m, tilted_raw)
assert result[0] == pytest.approx(-math.sin(tilt), abs=1e-5)
assert result[1] == pytest.approx(0, abs=1e-5)
assert result[2] == pytest.approx(math.cos(tilt), abs=1e-5)
@pytest.mark.parametrize(
"raw",
[
[0.3, -0.5, 0.81],
[-0.7, 0.4, 0.59],
],
)
def test_heading_preserves_orthogonality(self, raw):
"""Heading correction composed with level should remain a proper rotation."""
level_m = _calibrate_level(raw)
# Create a tilted reading for heading calibration
tilt_raw = [v + 0.3 for v in raw] # perturb to get XY component
heading_m = _calibrate_heading(level_m, tilt_raw)
product = _mat_mul(_transpose(heading_m), heading_m)
for i in range(9):
expected = 1.0 if i % 4 == 0 else 0.0
assert product[i] == pytest.approx(expected, abs=1e-5)
assert _det(heading_m) == pytest.approx(1.0, abs=1e-5)
# --- Calibration action schema & codegen ---
class TestCalibrateActionSchema:
"""Tests for the CALIBRATE_ACTION_SCHEMA used by both calibration actions."""
def test_schema_accepts_on_success_key(self):
"""on_success must be a recognised optional key."""
schema_keys = {str(k) for k in CALIBRATE_ACTION_SCHEMA.schema}
assert CONF_ON_SUCCESS in schema_keys
def test_schema_accepts_on_error_key(self):
"""on_error must be a recognised optional key."""
schema_keys = {str(k) for k in CALIBRATE_ACTION_SCHEMA.schema}
assert CONF_ON_ERROR in schema_keys
@pytest.fixture
def mock_codegen():
"""Mock cg and automation functions used by _build_calibrate_action."""
mock_var = MagicMock()
mock_parent = MagicMock()
with (
patch(
"esphome.components.motion.cg.get_variable",
new_callable=AsyncMock,
return_value=mock_parent,
) as mock_get_var,
patch(
"esphome.components.motion.cg.new_Pvariable",
return_value=mock_var,
) as mock_new_pvar,
patch(
"esphome.components.motion.automation.build_automation",
new_callable=AsyncMock,
) as mock_build_auto,
):
yield {
"get_variable": mock_get_var,
"new_Pvariable": mock_new_pvar,
"build_automation": mock_build_auto,
"var": mock_var,
"parent": mock_parent,
}
@pytest.mark.asyncio
async def test_build_calibrate_action_no_triggers(mock_codegen):
"""Without on_success/on_error, build_automation should not be called."""
config = {CONF_ID: MagicMock()}
action_id = MagicMock()
template_arg = MagicMock()
result = await _build_calibrate_action(config, action_id, template_arg, [])
assert result is mock_codegen["var"]
mock_codegen["new_Pvariable"].assert_called_once_with(
action_id, template_arg, mock_codegen["parent"]
)
mock_codegen["build_automation"].assert_not_called()
@pytest.mark.asyncio
async def test_build_calibrate_action_with_on_success(mock_codegen):
"""on_success should wire build_automation to get_success_trigger()."""
on_success_config = MagicMock()
config = {CONF_ID: MagicMock(), CONF_ON_SUCCESS: on_success_config}
await _build_calibrate_action(config, MagicMock(), MagicMock(), [])
mock_codegen["build_automation"].assert_called_once_with(
mock_codegen["var"].get_success_trigger(), [], on_success_config
)
@pytest.mark.asyncio
async def test_build_calibrate_action_with_on_error(mock_codegen):
"""on_error should wire build_automation to get_error_trigger()."""
on_error_config = MagicMock()
config = {CONF_ID: MagicMock(), CONF_ON_ERROR: on_error_config}
await _build_calibrate_action(config, MagicMock(), MagicMock(), [])
mock_codegen["build_automation"].assert_called_once_with(
mock_codegen["var"].get_error_trigger(), [], on_error_config
)
@pytest.mark.asyncio
async def test_build_calibrate_action_with_both_triggers(mock_codegen):
"""Both on_success and on_error should each produce a build_automation call."""
on_success_config = MagicMock()
on_error_config = MagicMock()
config = {
CONF_ID: MagicMock(),
CONF_ON_SUCCESS: on_success_config,
CONF_ON_ERROR: on_error_config,
}
await _build_calibrate_action(config, MagicMock(), MagicMock(), [])
assert mock_codegen["build_automation"].call_count == 2
calls = mock_codegen["build_automation"].call_args_list
# First call: on_success
assert calls[0].args == (
mock_codegen["var"].get_success_trigger(),
[],
on_success_config,
)
# Second call: on_error
assert calls[1].args == (
mock_codegen["var"].get_error_trigger(),
[],
on_error_config,
)
# --- Clear calibration action ---
class TestClearActionSchema:
"""Tests for CLEAR_ACTION_SCHEMA."""
def test_schema_has_save_key(self):
schema_keys = {str(k) for k in CLEAR_ACTION_SCHEMA.schema}
assert CONF_SAVE in schema_keys
def test_save_defaults_to_false(self):
result = CLEAR_ACTION_SCHEMA({CONF_ID: "x"})
assert result[CONF_SAVE] is False
@pytest.fixture
def mock_clear_codegen():
"""Mock cg functions used by clear_calibration_to_code."""
mock_var = MagicMock()
mock_parent = MagicMock()
with (
patch(
"esphome.components.motion.cg.get_variable",
new_callable=AsyncMock,
return_value=mock_parent,
),
patch(
"esphome.components.motion.cg.new_Pvariable",
return_value=mock_var,
) as mock_new_pvar,
patch("esphome.components.motion.cg.add") as mock_add,
):
yield {"new_Pvariable": mock_new_pvar, "add": mock_add, "var": mock_var}
@pytest.mark.asyncio
async def test_clear_action_without_save(mock_clear_codegen):
"""With save=False, set_save should not be emitted."""
config = {CONF_ID: MagicMock(), CONF_SAVE: False}
result = await clear_calibration_to_code(config, MagicMock(), MagicMock(), [])
assert result is mock_clear_codegen["var"]
mock_clear_codegen["add"].assert_not_called()
@pytest.mark.asyncio
async def test_clear_action_with_save(mock_clear_codegen):
"""With save=True, set_save(True) should be emitted exactly once."""
config = {CONF_ID: MagicMock(), CONF_SAVE: True}
await clear_calibration_to_code(config, MagicMock(), MagicMock(), [])
mock_clear_codegen["var"].set_save.assert_called_once_with(True)
mock_clear_codegen["add"].assert_called_once()
# --- Calibration persistence invalidation ---
#
# The C++ side stores a hash of the build-time base matrix alongside the saved
# calibration so a changed axis_map invalidates stale NVS data without orphaning
# storage (the pref key stays ID-stable). These tests pin the design properties
# of that base-matrix fingerprint: deterministic for identical maps, distinct
# for different ones.
def _hash_matrix(matrix: list[float]) -> int:
"""Python port of the C++ hash_matrix() (FNV-1a over the float bytes)."""
import struct
data = struct.pack("<9f", *matrix)
h = 2166136261
for b in data:
h ^= b
h = (h * 16777619) & 0xFFFFFFFF
return h
class TestBaseMatrixHash:
"""Properties of the base-matrix fingerprint used for NVS invalidation."""
def test_identical_axis_maps_hash_equal(self):
a = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"})
b = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"})
assert _hash_matrix([float(v) for v in a]) == _hash_matrix(
[float(v) for v in b]
)
def test_different_axis_maps_hash_differ(self):
identity = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"})
swapped = _axis_map_to_matrix({"x": "y", "y": "x", "z": "z"})
assert _hash_matrix([float(v) for v in identity]) != _hash_matrix(
[float(v) for v in swapped]
)
def test_sign_change_hashes_differ(self):
pos = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"})
neg = _axis_map_to_matrix({"x": "-x", "y": "y", "z": "z"})
assert _hash_matrix([float(v) for v in pos]) != _hash_matrix(
[float(v) for v in neg]
)
# --- Sensor config schema type validation ---
class TestSensorConfigSchema:
"""Tests for sensor CONFIG_SCHEMA type key validation."""
def test_invalid_type_rejected(self):
with pytest.raises((Invalid, MultipleInvalid), match="Unknown value"):
CONFIG_SCHEMA({"type": "invalid_type"})
def test_missing_type_rejected(self):
with pytest.raises((Invalid, MultipleInvalid)):
CONFIG_SCHEMA({})
@pytest.mark.parametrize(
"sensor_type",
_ACCELERATIONS + _GYROSCOPES + _ANGULAR_RATES + [CONF_PITCH, CONF_ROLL],
)
def test_valid_types_accepted(self, sensor_type):
"""Valid sensor types should pass type validation (errors from missing
required fields like motion_id are expected and acceptable)."""
try:
CONFIG_SCHEMA({"type": sensor_type})
except (Invalid, MultipleInvalid) as e:
# Should NOT be a type validation error
assert "Unknown value" not in str(e), (
f"Type '{sensor_type}' was rejected as unknown"
)