diff --git a/CODEOWNERS b/CODEOWNERS index 3c3e502058..abe33f9467 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -351,6 +351,7 @@ esphome/components/modbus_server/* @exciton esphome/components/mopeka_ble/* @Fabian-Schmidt @spbrogan esphome/components/mopeka_pro_check/* @spbrogan esphome/components/mopeka_std_check/* @Fabian-Schmidt +esphome/components/motion/* @esphome/core esphome/components/mpl3115a2/* @kbickar esphome/components/mpu6886/* @fabaff esphome/components/ms8607/* @e28eta diff --git a/esphome/components/motion/__init__.py b/esphome/components/motion/__init__.py new file mode 100644 index 0000000000..aea052fa2f --- /dev/null +++ b/esphome/components/motion/__init__.py @@ -0,0 +1,221 @@ +from collections.abc import Callable +import re + +from esphome import automation +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID, CONF_ON_ERROR, CONF_ON_SUCCESS +from esphome.cpp_generator import MockObj, MockObjClass +from esphome.helpers import fnv1_hash_object_id + +CODEOWNERS = ["@esphome/core"] + +DOMAIN = "motion" +IS_PLATFORM_COMPONENT = True + +# C++ namespace / class +motion_ns = cg.esphome_ns.namespace("motion") +MotionComponent = motion_ns.class_("MotionComponent", cg.PollingComponent) + +AXES = ["x", "y", "z"] + +CONF_AXIS_MAP = "axis_map" +CONF_MOTION_ID = "motion_id" +CONF_TRANSFORM_MATRIX = "transform_matrix" + +CalibrateLevelAction = motion_ns.class_("CalibrateLevelAction", automation.Action) +CalibrateHeadingAction = motion_ns.class_("CalibrateHeadingAction", automation.Action) +ClearCalibrationAction = motion_ns.class_("ClearCalibrationAction", automation.Action) + +KEY_ACCELEROMETER = "accelerometer" +KEY_GYROSCOPE = "gyroscope" + +SENSOR_SCHEMA = cv.Schema( + { + cv.GenerateID(CONF_MOTION_ID): cv.use_id(MotionComponent), + } +) + +_AXIS_REGEX = re.compile(r"^[+-]?[xyz]$", re.IGNORECASE) + + +def _axis_map(config: dict) -> dict: + errors = [] + for key, axis in config.items(): + if _AXIS_REGEX.fullmatch(axis) is None: + errors.append( + cv.Invalid( + "Each 'axis_map' config value must be one of 'x', 'y' or 'z' (optionally preceded by '+' or '-').", + path=[key], + ) + ) + values = {x.lower().removeprefix("-").removeprefix("+") for x in config.values()} + if values != set(AXES): + errors.append(cv.Invalid("Each axis may be mapped only once")) + if errors: + raise cv.MultipleInvalid(errors) + return config + + +def _axis_map_to_matrix(config: dict[str, str]) -> list[float]: + matrix = [] + for target_axis in AXES: + source_axis = config[target_axis].lower() + sign = -1.0 if source_axis.startswith("-") else 1.0 + source_axis = source_axis.removeprefix("+").removeprefix("-") + + row = [0.0, 0.0, 0.0] + row[AXES.index(source_axis)] = sign + matrix.extend(row) + + return matrix + + +def _transform_matrix(value): + """Accept a flat list of 9 floats or a 3x3 nested list.""" + if not isinstance(value, list) or len(value) == 0: + raise cv.Invalid("Expected a list of 9 numbers or a 3x3 nested list") + # Nested 3x3 + if isinstance(value[0], list): + if len(value) != 3: + raise cv.Invalid(f"3x3 matrix must have 3 rows, got {len(value)}") + flat = [] + for i, row in enumerate(value): + if not isinstance(row, list) or len(row) != 3: + raise cv.Invalid("Each row must be a list of 3 numbers", path=[i]) + flat.extend(cv.float_(v) for v in row) + return flat + # Flat list + if len(value) != 9: + raise cv.Invalid(f"Flat matrix must have exactly 9 values, got {len(value)}") + return [cv.float_(v) for v in value] + + +def _validate_matrix_options(config): + if CONF_AXIS_MAP in config and CONF_TRANSFORM_MATRIX in config: + raise cv.Invalid( + f"'{CONF_AXIS_MAP}' and '{CONF_TRANSFORM_MATRIX}' are mutually exclusive" + ) + return config + + +# Top-level CONFIG_SCHEMA +_CONFIG_SCHEMA = ( + cv.Schema( + { + cv.Optional(CONF_AXIS_MAP): cv.All( + {cv.Required(k): cv.string_strict for k in AXES}, + _axis_map, + ), + cv.Optional(CONF_TRANSFORM_MATRIX): _transform_matrix, + } + ) + .extend(cv.polling_component_schema("250ms")) + .add_extra(_validate_matrix_options) +) + + +def _add_data(has_accel: bool, has_gyro: bool) -> Callable[[dict], dict]: + + def validator(config): + config = config.copy() + config[KEY_ACCELEROMETER] = has_accel + config[KEY_GYROSCOPE] = has_gyro + return config + + return validator + + +def motion_schema(class_: MockObjClass, has_accel: bool, has_gyro: bool) -> cv.Schema: + return _CONFIG_SCHEMA.extend( + { + cv.GenerateID(): cv.declare_id(class_), + } + ).add_extra(_add_data(has_accel, has_gyro)) + + +# Code generation +async def register_motion_component(var: MockObj, config) -> None: + await cg.register_component(var, config) + # Set preference key for NVS save/restore (based on component ID) + obj_id = config[CONF_ID].id + pref_hash = fnv1_hash_object_id(obj_id) + cg.add(var.set_calibration_key(pref_hash)) + if axis_map := config.get(CONF_AXIS_MAP): + cg.add(var.set_matrix(_axis_map_to_matrix(axis_map))) + elif transform_matrix := config.get(CONF_TRANSFORM_MATRIX): + cg.add(var.set_matrix(transform_matrix)) + + +async def new_motion_component(config: dict) -> MockObj: + var = cg.new_Pvariable(config[CONF_ID]) + await register_motion_component(var, config) + return var + + +# --- Actions --- + +CONF_SAVE = "save" + +CALIBRATE_ACTION_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.use_id(MotionComponent), + cv.Optional(CONF_SAVE, default=False): cv.boolean, + cv.Optional(CONF_ON_SUCCESS): automation.validate_automation(single=True), + cv.Optional(CONF_ON_ERROR): automation.validate_automation(single=True), + } +) + + +async def _build_calibrate_action(config, action_id, template_arg, args): + parent = await cg.get_variable(config[CONF_ID]) + var = cg.new_Pvariable(action_id, template_arg, parent) + if config.get(CONF_SAVE): + cg.add(var.set_save(True)) + if on_success := config.get(CONF_ON_SUCCESS): + await automation.build_automation(var.get_success_trigger(), [], on_success) + if on_error := config.get(CONF_ON_ERROR): + await automation.build_automation(var.get_error_trigger(), [], on_error) + return var + + +@automation.register_action( + "motion.calibrate_level", + CalibrateLevelAction, + CALIBRATE_ACTION_SCHEMA, + synchronous=True, +) +async def calibrate_level_to_code(config, action_id, template_arg, args): + return await _build_calibrate_action(config, action_id, template_arg, args) + + +@automation.register_action( + "motion.calibrate_heading", + CalibrateHeadingAction, + CALIBRATE_ACTION_SCHEMA, + synchronous=True, +) +async def calibrate_heading_to_code(config, action_id, template_arg, args): + return await _build_calibrate_action(config, action_id, template_arg, args) + + +CLEAR_ACTION_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.use_id(MotionComponent), + cv.Optional(CONF_SAVE, default=False): cv.boolean, + } +) + + +@automation.register_action( + "motion.clear_calibration", + ClearCalibrationAction, + CLEAR_ACTION_SCHEMA, + synchronous=True, +) +async def clear_calibration_to_code(config, action_id, template_arg, args): + parent = await cg.get_variable(config[CONF_ID]) + var = cg.new_Pvariable(action_id, template_arg, parent) + if config.get(CONF_SAVE): + cg.add(var.set_save(True)) + return var diff --git a/esphome/components/motion/motion_component.cpp b/esphome/components/motion/motion_component.cpp new file mode 100644 index 0000000000..8715c8385c --- /dev/null +++ b/esphome/components/motion/motion_component.cpp @@ -0,0 +1,194 @@ +#include "motion_component.h" +#include "esphome/core/log.h" + +namespace esphome::motion { + +static const char *const TAG = "motion"; + +static void log_matrix(const float m[9]) { + ESP_LOGCONFIG(TAG, " Calibration matrix:"); + ESP_LOGCONFIG(TAG, " - [%9.6f, %9.6f, %9.6f]", m[0], m[1], m[2]); + ESP_LOGCONFIG(TAG, " - [%9.6f, %9.6f, %9.6f]", m[3], m[4], m[5]); + ESP_LOGCONFIG(TAG, " - [%9.6f, %9.6f, %9.6f]", m[6], m[7], m[8]); +} + +// FNV-1a over the raw bytes of the matrix. Identical axis maps always yield +// bit-identical matrices, so this is a stable fingerprint of the build-time base. +static uint32_t hash_matrix(const float m[9]) { + const uint8_t *bytes = reinterpret_cast(m); + uint32_t hash = 2166136261UL; + for (size_t i = 0; i < sizeof(float) * 9; i++) { + hash ^= bytes[i]; + hash *= 16777619UL; + } + return hash; +} + +void MotionComponent::setup() { + // matrix_ currently holds the build-time base (set_matrix ran during codegen). + this->base_hash_ = hash_matrix(this->base_matrix_); + this->pref_ = global_preferences->make_preference(this->pref_key_); + CalibrationPref saved; + if (this->pref_.load(&saved) && saved.base_hash == this->base_hash_) { + memcpy(this->matrix_, saved.matrix, sizeof(this->matrix_)); + ESP_LOGI(TAG, "Restored calibration from NVS"); + } else { + ESP_LOGD(TAG, "No matching saved calibration; using build-time matrix"); + } + log_matrix(this->matrix_); +} +void MotionComponent::dump_config() { + LOG_UPDATE_INTERVAL(this); + log_matrix(this->matrix_); +} +bool MotionComponent::save_calibration() { + if (this->pref_key_ == 0) { + ESP_LOGW(TAG, "Cannot save calibration: no preference key set"); + return false; + } + CalibrationPref pref{this->base_hash_, {}}; + memcpy(pref.matrix, this->matrix_, sizeof(pref.matrix)); + if (this->pref_.save(&pref)) { + global_preferences->sync(); + ESP_LOGI(TAG, "Saved calibration to NVS"); + return true; + } + ESP_LOGW(TAG, "Calibration save failed"); + return false; +} +void MotionComponent::clear_calibration() { + memcpy(this->matrix_, this->base_matrix_, sizeof(this->matrix_)); + ESP_LOGI(TAG, "Calibration reset to build-time matrix"); + log_matrix(this->matrix_); +} +void MotionComponent::update() { + if (this->is_failed()) + return; + MotionData motion_data{}; + MotionData raw_data{}; + if (!this->update_data(raw_data)) + return; + this->map_axes_(motion_data.acceleration, raw_data.acceleration); + this->map_axes_(motion_data.angular_rate, raw_data.angular_rate); + this->motion_data_callback_.call(motion_data); + + ESP_LOGV(TAG, "Accel: [%.3f, %.3f, %.3f] g; Gyro: [%.3f, %.3f, %.3f] °/s", motion_data.acceleration[X_AXIS], + motion_data.acceleration[Y_AXIS], motion_data.acceleration[Z_AXIS], motion_data.angular_rate[X_AXIS], + motion_data.angular_rate[Y_AXIS], motion_data.angular_rate[Z_AXIS]); +} + +bool MotionComponent::calibrate_level() { + MotionData raw{}; + if (!this->update_data(raw)) { + ESP_LOGW(TAG, "calibrate_level: failed to read sensor data"); + return false; + } + + // Apply the current matrix first so any existing axis mapping is preserved. + float mapped[3]; + this->map_axes_(mapped, raw.acceleration); + + float nx = mapped[X_AXIS]; + float ny = mapped[Y_AXIS]; + float nz = mapped[Z_AXIS]; + float mag = std::sqrt(nx * nx + ny * ny + nz * nz); + if (mag < 0.1f) { + ESP_LOGW(TAG, "calibrate_level: acceleration magnitude too small (%.3f)", mag); + return false; + } + + // Normalize + nx /= mag; + ny /= mag; + nz /= mag; + + // Compute rotation matrix R such that R * [nx, ny, nz] = [0, 0, 1] + // using Rodrigues' rotation formula, then compose with the existing matrix. + if (nz > 0.99999f) { + // Already aligned with +Z — nothing to compose + ESP_LOGI(TAG, "Level calibration: already aligned"); + log_matrix(this->matrix_); + // returning true here will trigger on_success and a save to NVS, but the save will ultimately be a no-op + // since the backend sync will not write unchanged values. + return true; + } + + float r[9]; + if (nz < -0.9999f) { + // Aligned with -Z — 180° rotation about X + float m[9] = {1, 0, 0, 0, -1, 0, 0, 0, -1}; + memcpy(r, m, sizeof(r)); + } else { + float f = 1.0f / (1.0f + nz); + r[0] = 1.0f - nx * nx * f; + r[1] = -nx * ny * f; + r[2] = -nx; + r[3] = -nx * ny * f; + r[4] = 1.0f - ny * ny * f; + r[5] = -ny; + r[6] = nx; + r[7] = ny; + r[8] = nz; + } + + // Compose: new_matrix = R * old_matrix + float old[9]; + memcpy(old, this->matrix_, sizeof(old)); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + this->matrix_[i * 3 + j] = r[i * 3 + 0] * old[j] + r[i * 3 + 1] * old[3 + j] + r[i * 3 + 2] * old[6 + j]; + } + } + + ESP_LOGI(TAG, "Level calibration applied (mapped accel: [%.3f, %.3f, %.3f])", mapped[X_AXIS], mapped[Y_AXIS], + mapped[Z_AXIS]); + log_matrix(this->matrix_); + return true; +} + +bool MotionComponent::calibrate_heading() { + MotionData raw{}; + if (!this->update_data(raw)) { + ESP_LOGW(TAG, "calibrate_heading: failed to read sensor data"); + return false; + } + + // Apply current matrix to get the mapped acceleration + float mapped[3]; + this->map_axes_(mapped, raw.acceleration); + + float mx = mapped[X_AXIS]; + float my = mapped[Y_AXIS]; + float h = std::sqrt(mx * mx + my * my); + if (h < 0.05f) { + ESP_LOGW(TAG, "calibrate_heading: device must be tilted (XY magnitude %.3f too small)", h); + return false; + } + + // Rotation angle in the XY plane: eliminate Y component while preserving X sign. + // Without the sign correction, atan2(my,mx) would rotate everything to +X, + // flipping the sign when the tilt projects onto -X. + float sign_mx = mx >= 0 ? 1.0f : -1.0f; + float cos_phi = sign_mx * mx / h; // = |mx| / h + float sin_phi = sign_mx * my / h; + + // Compose Rz(-phi) with the current matrix + // Rz(-phi) = [[cos_phi, sin_phi, 0], [-sin_phi, cos_phi, 0], [0, 0, 1]] + float old[9]; + memcpy(old, this->matrix_, sizeof(old)); + + this->matrix_[0] = cos_phi * old[0] + sin_phi * old[3]; + this->matrix_[1] = cos_phi * old[1] + sin_phi * old[4]; + this->matrix_[2] = cos_phi * old[2] + sin_phi * old[5]; + this->matrix_[3] = -sin_phi * old[0] + cos_phi * old[3]; + this->matrix_[4] = -sin_phi * old[1] + cos_phi * old[4]; + this->matrix_[5] = -sin_phi * old[2] + cos_phi * old[5]; + // Row 2 unchanged + + ESP_LOGI(TAG, "Heading calibration applied (mapped accel: [%.3f, %.3f, %.3f])", mapped[X_AXIS], mapped[Y_AXIS], + mapped[Z_AXIS]); + log_matrix(this->matrix_); + return true; +} + +} // namespace esphome::motion diff --git a/esphome/components/motion/motion_component.h b/esphome/components/motion/motion_component.h new file mode 100644 index 0000000000..00310c16fe --- /dev/null +++ b/esphome/components/motion/motion_component.h @@ -0,0 +1,154 @@ +#pragma once + +#include "esphome/core/automation.h" +#include "esphome/core/component.h" +#include "esphome/core/helpers.h" +#include "esphome/core/preferences.h" +#include +#include +#include // required for generated lambda code + +namespace esphome::motion { + +// ---Data class + +struct MotionData { + float acceleration[3]{NAN, NAN, NAN}; + float angular_rate[3]{NAN, NAN, NAN}; + // TODO - compass +}; + +// indices into data arrays +static constexpr uint8_t X_AXIS = 0; +static constexpr uint8_t Y_AXIS = 1; +static constexpr uint8_t Z_AXIS = 2; + +// Persisted calibration. `base_hash` ties the stored matrix to the build-time +// (axis_map / transform_matrix) base; if the base changes the saved calibration +// is ignored. Stored under a stable, ID-derived key so it overwrites in place. +struct CalibrationPref { + uint32_t base_hash; + float matrix[9]; +} PACKED; + +// Main component class +class MotionComponent : public PollingComponent { + public: + // Lifecycle + void setup() override; + void update() override; + void dump_config() override; + float get_setup_priority() const override { return setup_priority::DATA; } + + void set_matrix(const std::array &m) { + memcpy(this->base_matrix_, m.data(), sizeof(this->base_matrix_)); + memcpy(this->matrix_, m.data(), sizeof(this->matrix_)); + } + void set_calibration_key(uint32_t key) { this->pref_key_ = key; } + + /// Calibrate the matrix so the current reading maps to [0, 0, 1] (device flat). + bool calibrate_level(); + /// Assuming Y-axis rotation only, correct the heading so X/Y align correctly. + bool calibrate_heading(); + /// Save the current matrix to NVS. + bool save_calibration(); + /// Restore the build-time (axis_map / transform_matrix) base, discarding calibration. + void clear_calibration(); + + template void add_listener(F &&cb) { this->motion_data_callback_.add(std::forward(cb)); } + + protected: + // platforms must implement this method to update raw data. + virtual bool update_data(MotionData &data) = 0; + + // for mapping axes + float matrix_[9]{ + 1, 0, 0, 0, 1, 0, 0, 0, 1, + }; + // build-time base (axis_map / transform_matrix); used to detect config changes + // and to restore on clear_calibration(). + float base_matrix_[9]{ + 1, 0, 0, 0, 1, 0, 0, 0, 1, + }; + + void map_axes_(float output[3], const float input[3]) const { + output[0] = input[X_AXIS] * this->matrix_[0] + input[Y_AXIS] * this->matrix_[1] + input[Z_AXIS] * this->matrix_[2]; + output[1] = input[X_AXIS] * this->matrix_[3] + input[Y_AXIS] * this->matrix_[4] + input[Z_AXIS] * this->matrix_[5]; + output[2] = input[X_AXIS] * this->matrix_[6] + input[Y_AXIS] * this->matrix_[7] + input[Z_AXIS] * this->matrix_[8]; + } + + LazyCallbackManager motion_data_callback_{}; + uint32_t pref_key_{0}; + uint32_t base_hash_{0}; // hash of base_matrix_, captured in setup() + ESPPreferenceObject pref_{}; +}; + +// --- Actions --- + +template class CalibrateLevelAction : public Action { + public: + explicit CalibrateLevelAction(MotionComponent *parent) : parent_(parent) {} + void set_save(bool save) { this->save_ = save; } + Trigger<> *get_success_trigger() { return &this->success_trigger_; } + Trigger<> *get_error_trigger() { return &this->error_trigger_; } + + protected: + void play(const Ts &...) override { + if (this->parent_->calibrate_level()) { + // if not saving, calibration success is enough. If save required only report success after that succeeds too. + if (!this->save_ || this->parent_->save_calibration()) { + this->success_trigger_.trigger(); + return; + } + } + this->error_trigger_.trigger(); + } + + MotionComponent *parent_; + Trigger<> success_trigger_; + Trigger<> error_trigger_; + bool save_{false}; +}; + +template class CalibrateHeadingAction : public Action { + public: + explicit CalibrateHeadingAction(MotionComponent *parent) : parent_(parent) {} + void set_save(bool save) { this->save_ = save; } + Trigger<> *get_success_trigger() { return &this->success_trigger_; } + Trigger<> *get_error_trigger() { return &this->error_trigger_; } + + protected: + void play(const Ts &...) override { + if (this->parent_->calibrate_heading()) { + // if not saving, calibration success is enough. If save required only report success after that succeeds too. + if (!this->save_ || this->parent_->save_calibration()) { + this->success_trigger_.trigger(); + return; + } + } + this->error_trigger_.trigger(); + } + + MotionComponent *parent_; + Trigger<> success_trigger_; + Trigger<> error_trigger_; + bool save_{false}; +}; + +template class ClearCalibrationAction : public Action { + public: + explicit ClearCalibrationAction(MotionComponent *parent) : parent_(parent) {} + void set_save(bool save) { this->save_ = save; } + + protected: + void play(const Ts &...) override { + this->parent_->clear_calibration(); + if (this->save_) + this->parent_->save_calibration(); + } + + MotionComponent *parent_; + bool save_{false}; +}; + +} // namespace esphome::motion diff --git a/esphome/components/motion/sensor.py b/esphome/components/motion/sensor.py new file mode 100644 index 0000000000..ad3163a01a --- /dev/null +++ b/esphome/components/motion/sensor.py @@ -0,0 +1,128 @@ +# YAML config keys +import esphome.codegen as cg +from esphome.components import sensor +import esphome.config_validation as cv +from esphome.const import ( + CONF_TYPE, + ICON_ACCELERATION, + ICON_ROTATE_RIGHT, + STATE_CLASS_MEASUREMENT, + UNIT_DEGREE_PER_SECOND, + UNIT_DEGREES, + UNIT_G, +) +from esphome.cpp_generator import MockObj +from esphome.cpp_types import std_ns +import esphome.final_validate as fv + +from . import ( + AXES, + CONF_MOTION_ID, + KEY_ACCELEROMETER, + KEY_GYROSCOPE, + SENSOR_SCHEMA, + motion_ns, +) + +MotionData = motion_ns.class_("MotionData") + +CONF_PITCH = "pitch" +CONF_ROLL = "roll" +ICON_SEESAW = "mdi:seesaw" + + +def _accel_sensor_schema(): + return sensor.sensor_schema( + unit_of_measurement=UNIT_G, + icon=ICON_ACCELERATION, + accuracy_decimals=2, + state_class=STATE_CLASS_MEASUREMENT, + ).extend(SENSOR_SCHEMA) + + +def _gyro_sensor_schema(): + return sensor.sensor_schema( + unit_of_measurement=UNIT_DEGREE_PER_SECOND, + icon=ICON_ROTATE_RIGHT, + accuracy_decimals=2, + state_class=STATE_CLASS_MEASUREMENT, + ).extend(SENSOR_SCHEMA) + + +def _level_sensor_schema(): + return sensor.sensor_schema( + unit_of_measurement=UNIT_DEGREES, + icon=ICON_SEESAW, + accuracy_decimals=2, + state_class=STATE_CLASS_MEASUREMENT, + ).extend(SENSOR_SCHEMA) + + +_ACCELERATIONS = ["acceleration_" + a for a in AXES] +_GYROSCOPES = ["gyroscope_" + g for g in AXES] +_ANGULAR_RATES = ["angular_rate_" + r for r in AXES] + +CONFIG_SCHEMA = cv.typed_schema( + { + **{x: _accel_sensor_schema() for x in _ACCELERATIONS}, + **{x: _gyro_sensor_schema() for x in _GYROSCOPES}, + **{x: _gyro_sensor_schema() for x in _ANGULAR_RATES}, + **{x: _level_sensor_schema() for x in (CONF_PITCH, CONF_ROLL)}, + } +) + + +def _final_validate(config: dict) -> None: + full_config = fv.full_config.get() + motion_path = full_config.get_path_for_id(config[CONF_MOTION_ID])[:-1] + motion_config = full_config.get_config_for_path(motion_path) + has_accel = motion_config.get(KEY_ACCELEROMETER, False) + has_gyro = motion_config.get(KEY_GYROSCOPE, False) + + sensor_type = config[CONF_TYPE] + if ( + sensor_type in _ACCELERATIONS or sensor_type in (CONF_ROLL, CONF_PITCH) + ) and not has_accel: + raise cv.Invalid( + "The motion device does not measure acceleration", path=[CONF_TYPE] + ) + if (sensor_type in _GYROSCOPES or sensor_type in _ANGULAR_RATES) and not has_gyro: + raise cv.Invalid( + "The motion device does not measure angular rate", path=[CONF_TYPE] + ) + + +FINAL_VALIDATE_SCHEMA = _final_validate + + +def build_sensor_expr(sensor_type: str, data: MockObj) -> MockObj: + """Build the C++ expression for a motion sensor type.""" + + # Note that is included via this component's header file. + pif = std_ns.namespace("numbers").pi_v.template(cg.float_) + if sensor_type == CONF_ROLL: + ay = data.acceleration[1] + az = data.acceleration[2] + return std_ns.atan2(ay, az) * (180.0 / pif) + if sensor_type == CONF_PITCH: + ax = data.acceleration[0] + ay = data.acceleration[1] + az = data.acceleration[2] + return std_ns.atan2(-ax, std_ns.sqrt(ay * ay + az * az)) * (180.0 / pif) + sensor_offset = AXES.index(sensor_type[-1:]) + if sensor_type in _GYROSCOPES: + sensor_type = _ANGULAR_RATES[sensor_offset] + return getattr(data, str(sensor_type[:-2]))[sensor_offset] + + +async def to_code(config): + sensor_type = config[CONF_TYPE] + var = await sensor.new_sensor(config) + parent = await cg.get_variable(config[CONF_MOTION_ID]) + data = MockObj("data") + expr = build_sensor_expr(sensor_type, data) + value_lambda = await cg.process_lambda( + var.publish_state(expr), + [(MotionData.operator("ref"), str(data))], + ) + cg.add(parent.add_listener(value_lambda)) diff --git a/tests/component_tests/motion/__init__.py b/tests/component_tests/motion/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/component_tests/motion/test_motion.py b/tests/component_tests/motion/test_motion.py new file mode 100644 index 0000000000..f2c0f26344 --- /dev/null +++ b/tests/component_tests/motion/test_motion.py @@ -0,0 +1,899 @@ +"""Tests for the motion component.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from voluptuous import Invalid, MultipleInvalid + +from esphome.components.motion import ( + CALIBRATE_ACTION_SCHEMA, + CLEAR_ACTION_SCHEMA, + CONF_AXIS_MAP, + CONF_SAVE, + CONF_TRANSFORM_MATRIX, + _axis_map, + _axis_map_to_matrix, + _build_calibrate_action, + _transform_matrix, + _validate_matrix_options, + clear_calibration_to_code, +) +from esphome.components.motion.sensor import ( + _ACCELERATIONS, + _ANGULAR_RATES, + _GYROSCOPES, + CONF_PITCH, + CONF_ROLL, + CONFIG_SCHEMA, + build_sensor_expr, +) +from esphome.const import CONF_ID, CONF_ON_ERROR, CONF_ON_SUCCESS +from esphome.cpp_generator import MockObj + +# --- Axis map validation --- + + +class TestAxisMapValidation: + """Tests for the _axis_map validator.""" + + def test_identity_map(self): + result = _axis_map({"x": "x", "y": "y", "z": "z"}) + assert result == {"x": "x", "y": "y", "z": "z"} + + def test_axis_swap(self): + result = _axis_map({"x": "y", "y": "z", "z": "x"}) + assert result == {"x": "y", "y": "z", "z": "x"} + + def test_negation(self): + result = _axis_map({"x": "-y", "y": "z", "z": "x"}) + assert result == {"x": "-y", "y": "z", "z": "x"} + + def test_plus_prefix(self): + result = _axis_map({"x": "+y", "y": "z", "z": "x"}) + assert result == {"x": "+y", "y": "z", "z": "x"} + + def test_case_insensitive(self): + result = _axis_map({"x": "X", "y": "Y", "z": "Z"}) + assert result == {"x": "X", "y": "Y", "z": "Z"} + + def test_invalid_axis_value(self): + with pytest.raises(MultipleInvalid): + _axis_map({"x": "a", "y": "y", "z": "z"}) + + def test_duplicate_mapping(self): + with pytest.raises(MultipleInvalid): + _axis_map({"x": "x", "y": "x", "z": "z"}) + + def test_all_same_axis(self): + with pytest.raises(MultipleInvalid): + _axis_map({"x": "x", "y": "x", "z": "x"}) + + def test_empty_value(self): + with pytest.raises(MultipleInvalid): + _axis_map({"x": "", "y": "y", "z": "z"}) + + def test_invalid_and_duplicate(self): + """Both invalid value and duplicate should produce multiple errors.""" + with pytest.raises(MultipleInvalid) as exc_info: + _axis_map({"x": "a", "y": "x", "z": "z"}) + # Should have at least the invalid regex error and the duplicate error + assert len(exc_info.value.errors) >= 2 + + +# --- Transform matrix validation --- + + +class TestTransformMatrix: + """Tests for the _transform_matrix validator.""" + + def test_flat_identity(self): + result = _transform_matrix([1, 0, 0, 0, 1, 0, 0, 0, 1]) + assert result == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] + + def test_flat_values_converted_to_float(self): + result = _transform_matrix([1, 2, 3, 4, 5, 6, 7, 8, 9]) + assert all(isinstance(v, float) for v in result) + + def test_nested_3x3(self): + result = _transform_matrix([[1, 0, 0], [0, 1, 0], [0, 0, 1]]) + assert result == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] + + def test_nested_3x3_values(self): + result = _transform_matrix( + [[0.5, 0.1, -0.2], [-0.1, 0.9, 0.3], [0.2, -0.3, 0.8]] + ) + assert len(result) == 9 + assert result[0] == pytest.approx(0.5) + assert result[3] == pytest.approx(-0.1) + assert result[8] == pytest.approx(0.8) + + def test_flat_wrong_length_short(self): + with pytest.raises(Invalid, match="exactly 9"): + _transform_matrix([1, 0, 0]) + + def test_flat_wrong_length_long(self): + with pytest.raises(Invalid, match="exactly 9"): + _transform_matrix([1] * 12) + + def test_nested_wrong_row_count(self): + with pytest.raises(Invalid, match="3 rows"): + _transform_matrix([[1, 0, 0], [0, 1, 0]]) + + def test_nested_wrong_column_count(self): + with pytest.raises(Invalid, match="3 numbers"): + _transform_matrix([[1, 0], [0, 1, 0], [0, 0, 1]]) + + def test_empty_list(self): + with pytest.raises(Invalid): + _transform_matrix([]) + + def test_not_a_list(self): + with pytest.raises(Invalid): + _transform_matrix("identity") + + +class TestValidateMatrixOptions: + """Tests for mutual exclusivity of axis_map and transform_matrix.""" + + def test_neither_passes(self): + config = {"some_key": "value"} + assert _validate_matrix_options(config) is config + + def test_axis_map_only_passes(self): + config = {CONF_AXIS_MAP: {"x": "x", "y": "y", "z": "z"}} + assert _validate_matrix_options(config) is config + + def test_transform_matrix_only_passes(self): + config = {CONF_TRANSFORM_MATRIX: [1, 0, 0, 0, 1, 0, 0, 0, 1]} + assert _validate_matrix_options(config) is config + + def test_both_raises(self): + config = { + CONF_AXIS_MAP: {"x": "x", "y": "y", "z": "z"}, + CONF_TRANSFORM_MATRIX: [1, 0, 0, 0, 1, 0, 0, 0, 1], + } + with pytest.raises(Invalid, match="mutually exclusive"): + _validate_matrix_options(config) + + +# --- Axis map to matrix --- + + +class TestAxisMapToMatrix: + """Tests for _axis_map_to_matrix conversion.""" + + def test_identity(self): + assert _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"}) == [ + 1, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 1, + ] + + def test_swap_xy(self): + # x←y, y←x, z←z + assert _axis_map_to_matrix({"x": "y", "y": "x", "z": "z"}) == [ + 0, + 1, + 0, + 1, + 0, + 0, + 0, + 0, + 1, + ] + + def test_rotate_xyz(self): + # x←y, y←z, z←x + assert _axis_map_to_matrix({"x": "y", "y": "z", "z": "x"}) == [ + 0, + 1, + 0, + 0, + 0, + 1, + 1, + 0, + 0, + ] + + def test_negate_x(self): + assert _axis_map_to_matrix({"x": "-x", "y": "y", "z": "z"}) == [ + -1, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 1, + ] + + def test_negate_z(self): + assert _axis_map_to_matrix({"x": "x", "y": "y", "z": "-z"}) == [ + 1, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + -1, + ] + + def test_swap_and_negate(self): + # x←-y, y←z, z←x + assert _axis_map_to_matrix({"x": "-y", "y": "z", "z": "x"}) == [ + 0, + -1, + 0, + 0, + 0, + 1, + 1, + 0, + 0, + ] + + def test_plus_prefix_ignored(self): + assert _axis_map_to_matrix({"x": "+y", "y": "z", "z": "x"}) == [ + 0, + 1, + 0, + 0, + 0, + 1, + 1, + 0, + 0, + ] + + +# --- Sensor expression generation --- + + +def _expr_str(sensor_type: str) -> str: + """Build a sensor expression via the production function and return its string form.""" + return str(build_sensor_expr(sensor_type, MockObj("data"))) + + +class TestSensorExpressions: + """Tests that sensor code generation produces correct C++ expressions.""" + + @pytest.mark.parametrize( + "sensor_type,expected_index", + [ + ("acceleration_x", 0), + ("acceleration_y", 1), + ("acceleration_z", 2), + ], + ) + def test_acceleration_sensors(self, sensor_type, expected_index): + assert _expr_str(sensor_type) == f"data.acceleration[{expected_index}]" + + @pytest.mark.parametrize( + "sensor_type,expected_index", + [ + ("angular_rate_x", 0), + ("angular_rate_y", 1), + ("angular_rate_z", 2), + ], + ) + def test_angular_rate_sensors(self, sensor_type, expected_index): + assert _expr_str(sensor_type) == f"data.angular_rate[{expected_index}]" + + @pytest.mark.parametrize( + "sensor_type,expected_index", + [ + ("gyroscope_x", 0), + ("gyroscope_y", 1), + ("gyroscope_z", 2), + ], + ) + def test_gyroscope_maps_to_angular_rate(self, sensor_type, expected_index): + """Gyroscope sensor types should be remapped to angular_rate in the expression.""" + assert _expr_str(sensor_type) == f"data.angular_rate[{expected_index}]" + + def test_roll_expression(self): + expr = _expr_str("roll") + assert "std::atan2" in expr + assert "data.acceleration[1]" in expr + assert "data.acceleration[2]" in expr + assert "180.0f" in expr + assert "std::numbers::pi_v" in expr + # Roll should NOT reference acceleration[0] + assert "data.acceleration[0]" not in expr + + def test_pitch_expression(self): + expr = _expr_str("pitch") + assert "std::atan2" in expr + assert "std::sqrt" in expr + # All three axes used + assert "data.acceleration[0]" in expr + assert "data.acceleration[1]" in expr + assert "data.acceleration[2]" in expr + assert "180.0f" in expr + assert "std::numbers::pi_v" in expr + # Pitch negates the x component + assert "(-data.acceleration[0])" in expr + + +# --- Calibration math --- +# +# Pure-Python reimplementation of the C++ calibration algorithms so we can +# verify the mathematical properties without needing to compile C++. + + +def _mat_vec(m: list[float], v: list[float]) -> list[float]: + """Multiply a row-major 3x3 matrix by a 3-vector.""" + return [ + m[0] * v[0] + m[1] * v[1] + m[2] * v[2], + m[3] * v[0] + m[4] * v[1] + m[5] * v[2], + m[6] * v[0] + m[7] * v[1] + m[8] * v[2], + ] + + +def _mat_mul(a: list[float], b: list[float]) -> list[float]: + """Multiply two row-major 3x3 matrices.""" + r = [0.0] * 9 + for i in range(3): + for j in range(3): + r[i * 3 + j] = sum(a[i * 3 + k] * b[k * 3 + j] for k in range(3)) + return r + + +def _transpose(m: list[float]) -> list[float]: + """Transpose a row-major 3x3 matrix.""" + return [m[0], m[3], m[6], m[1], m[4], m[7], m[2], m[5], m[8]] + + +def _det(m: list[float]) -> float: + """Determinant of a 3x3 matrix.""" + return ( + m[0] * (m[4] * m[8] - m[5] * m[7]) + - m[1] * (m[3] * m[8] - m[5] * m[6]) + + m[2] * (m[3] * m[7] - m[4] * m[6]) + ) + + +def _calibrate_level( + raw: list[float], matrix: list[float] | None = None +) -> list[float]: + """Python port of MotionComponent::calibrate_level. + + Composes the correction with *matrix* (defaults to identity). + """ + import math + + if matrix is None: + matrix = list(IDENTITY) + + # Apply current matrix first + mapped = _mat_vec(matrix, raw) + + nx, ny, nz = mapped + mag = math.sqrt(nx * nx + ny * ny + nz * nz) + nx /= mag + ny /= mag + nz /= mag + + if nz > 0.9999: + return matrix[:] # already aligned, preserve existing matrix + + if nz < -0.9999: + r = [1, 0, 0, 0, -1, 0, 0, 0, -1] + else: + f = 1.0 / (1.0 + nz) + r = [ + 1.0 - nx * nx * f, + -nx * ny * f, + -nx, + -nx * ny * f, + 1.0 - ny * ny * f, + -ny, + nx, + ny, + nz, + ] + + return _mat_mul(r, matrix) + + +def _calibrate_heading(matrix: list[float], raw: list[float]) -> list[float]: + """Python port of MotionComponent::calibrate_heading.""" + import math + + mapped = _mat_vec(matrix, raw) + mx, my = mapped[0], mapped[1] + h = math.sqrt(mx * mx + my * my) + sign_mx = 1.0 if mx >= 0 else -1.0 + cos_phi = sign_mx * mx / h # = |mx| / h + sin_phi = sign_mx * my / h + + old = matrix[:] + new = old[:] + new[0] = cos_phi * old[0] + sin_phi * old[3] + new[1] = cos_phi * old[1] + sin_phi * old[4] + new[2] = cos_phi * old[2] + sin_phi * old[5] + new[3] = -sin_phi * old[0] + cos_phi * old[3] + new[4] = -sin_phi * old[1] + cos_phi * old[4] + new[5] = -sin_phi * old[2] + cos_phi * old[5] + return new + + +IDENTITY = [1, 0, 0, 0, 1, 0, 0, 0, 1] + + +class TestCalibrateLevel: + """Verify the Rodrigues-based level calibration matrix.""" + + def _assert_maps_to_z(self, raw: list[float]) -> list[float]: + """Assert that the calibration matrix maps raw to [0, 0, 1].""" + import math + + m = _calibrate_level(raw) + mag = math.sqrt(sum(v * v for v in raw)) + norm = [v / mag for v in raw] + result = _mat_vec(m, norm) + assert result[0] == pytest.approx(0, abs=1e-6) + assert result[1] == pytest.approx(0, abs=1e-6) + assert result[2] == pytest.approx(1, abs=1e-6) + return m + + def test_already_flat(self): + m = _calibrate_level([0, 0, 1.0]) + assert m == IDENTITY + + def test_preserves_existing_matrix_when_flat(self): + """If already flat after axis mapping, level cal should not change the matrix.""" + swap = [0, 1, 0, 1, 0, 0, 0, 0, 1] # swap X↔Y + m = _calibrate_level([0, 0, 1.0], swap) + assert m == swap + + def test_composes_with_existing_matrix(self): + """Level calibration should correct tilt while preserving an existing axis swap.""" + import math + + swap = [0, 1, 0, 1, 0, 0, 0, 0, 1] # swap X↔Y + # Tilted raw: gravity has X component in raw frame + raw = [0.3, 0.0, 0.954] + m = _calibrate_level(raw, swap) + # After calibration, current raw should map to [0, 0, ~1] + mag = math.sqrt(sum(v * v for v in raw)) + norm = [v / mag for v in raw] + result = _mat_vec(m, norm) + assert result[0] == pytest.approx(0, abs=1e-5) + assert result[1] == pytest.approx(0, abs=1e-5) + assert result[2] == pytest.approx(1, abs=1e-5) + # Result should differ from calibrating without the swap + m_no_swap = _calibrate_level(raw) + assert m != m_no_swap + + def test_upside_down(self): + m = _calibrate_level([0, 0, -1.0]) + # 180° about X + assert m == [1, 0, 0, 0, -1, 0, 0, 0, -1] + result = _mat_vec(m, [0, 0, -1]) + assert result[2] == pytest.approx(1, abs=1e-6) + + def test_gravity_along_x(self): + self._assert_maps_to_z([1.0, 0, 0]) + + def test_gravity_along_neg_x(self): + self._assert_maps_to_z([-1.0, 0, 0]) + + def test_gravity_along_y(self): + self._assert_maps_to_z([0, 1.0, 0]) + + def test_tilted_45_degrees(self): + import math + + self._assert_maps_to_z( + [math.sin(math.radians(45)), 0, math.cos(math.radians(45))] + ) + + def test_arbitrary_vector(self): + self._assert_maps_to_z([0.3, -0.5, 0.81]) + + def test_unnormalized_input(self): + """Input does not need to be unit length.""" + self._assert_maps_to_z([0.6, -1.0, 1.62]) + + @pytest.mark.parametrize( + "raw", + [ + [1.0, 0, 0], + [0, 1.0, 0], + [0.3, -0.5, 0.81], + [-0.7, 0.4, 0.59], + ], + ) + def test_result_is_proper_rotation(self, raw): + """The resulting matrix should be orthogonal with determinant +1.""" + m = _calibrate_level(raw) + # R^T * R ≈ I + product = _mat_mul(_transpose(m), m) + for i in range(9): + expected = 1.0 if i % 4 == 0 else 0.0 + assert product[i] == pytest.approx(expected, abs=1e-6) + # det ≈ 1 + assert _det(m) == pytest.approx(1.0, abs=1e-6) + + +class TestCalibrateHeading: + """Verify the Z-rotation heading correction.""" + + def test_y_axis_tilt_no_heading_error(self): + """Device tilted purely around Y — heading should already be correct.""" + import math + + flat_raw = [0, 0, 1.0] + level_m = _calibrate_level(flat_raw) + # Tilt 30° around Y: gravity = [-sin30, 0, cos30] + tilted_raw = [-math.sin(math.radians(30)), 0, math.cos(math.radians(30))] + heading_m = _calibrate_heading(level_m, tilted_raw) + # Matrix should barely change since there's no Y component + for i in range(9): + assert heading_m[i] == pytest.approx(level_m[i], abs=1e-6) + + def test_corrects_heading_rotation(self): + """After level+heading calibration, mapped Y should be ~0 when tilted.""" + import math + + # Simulate a sensor whose chip is rotated 30° around Z relative to enclosure + angle = math.radians(30) + # When the enclosure is flat, the raw reading is [0, 0, 1] regardless of Z rotation + level_m = _calibrate_level([0, 0, 1.0]) + + # When tilted around the enclosure's Y axis, the raw reading in the + # chip frame has both X and Y components due to the Z-rotation offset + tilt = math.radians(20) + # In enclosure frame: [-sin(tilt), 0, cos(tilt)] + # Rotated by Z-angle into chip frame: + ex = -math.sin(tilt) * math.cos(angle) + ey = -math.sin(tilt) * math.sin(angle) + ez = math.cos(tilt) + tilted_raw = [ex, ey, ez] + + heading_m = _calibrate_heading(level_m, tilted_raw) + # After correction, mapped Y should be 0 + result = _mat_vec(heading_m, tilted_raw) + assert result[1] == pytest.approx(0, abs=1e-6) + # Z should still be correct + assert result[2] == pytest.approx(math.cos(tilt), abs=1e-6) + + def test_full_calibration_sequence(self): + """End-to-end: level then heading produces correct frame alignment.""" + import math + + # Chip is mounted tilted 15° around Y and 25° around Z + # Build the chip-to-enclosure rotation: Rz(25°) * Ry(15°) + yz = math.radians(25) + yy = math.radians(15) + # Ry(yy) + ry = [ + math.cos(yy), + 0, + math.sin(yy), + 0, + 1, + 0, + -math.sin(yy), + 0, + math.cos(yy), + ] + # Rz(yz) + rz = [ + math.cos(yz), + -math.sin(yz), + 0, + math.sin(yz), + math.cos(yz), + 0, + 0, + 0, + 1, + ] + chip_rot = _mat_mul(rz, ry) # chip orientation in enclosure frame + # Inverse (transpose) maps enclosure vectors to chip readings + chip_rot_inv = _transpose(chip_rot) + + # Step 1: Device flat — gravity in enclosure frame is [0, 0, 1] + flat_raw = _mat_vec(chip_rot_inv, [0, 0, 1]) + level_m = _calibrate_level(flat_raw) + + # After level calibration, flat reading should map to [0, 0, 1] + check_flat = _mat_vec(level_m, flat_raw) + assert check_flat[0] == pytest.approx(0, abs=1e-5) + assert check_flat[1] == pytest.approx(0, abs=1e-5) + assert check_flat[2] == pytest.approx(1, abs=1e-5) + + # Step 2: Tilt enclosure around Y by 20° + tilt = math.radians(20) + tilted_enclosure = [-math.sin(tilt), 0, math.cos(tilt)] + tilted_raw = _mat_vec(chip_rot_inv, tilted_enclosure) + heading_m = _calibrate_heading(level_m, tilted_raw) + + # After heading calibration, the mapped reading should be + # [-sin(tilt), 0, cos(tilt)] — all horizontal component in X + result = _mat_vec(heading_m, tilted_raw) + assert result[0] == pytest.approx(-math.sin(tilt), abs=1e-5) + assert result[1] == pytest.approx(0, abs=1e-5) + assert result[2] == pytest.approx(math.cos(tilt), abs=1e-5) + + @pytest.mark.parametrize( + "raw", + [ + [0.3, -0.5, 0.81], + [-0.7, 0.4, 0.59], + ], + ) + def test_heading_preserves_orthogonality(self, raw): + """Heading correction composed with level should remain a proper rotation.""" + + level_m = _calibrate_level(raw) + # Create a tilted reading for heading calibration + tilt_raw = [v + 0.3 for v in raw] # perturb to get XY component + heading_m = _calibrate_heading(level_m, tilt_raw) + product = _mat_mul(_transpose(heading_m), heading_m) + for i in range(9): + expected = 1.0 if i % 4 == 0 else 0.0 + assert product[i] == pytest.approx(expected, abs=1e-5) + assert _det(heading_m) == pytest.approx(1.0, abs=1e-5) + + +# --- Calibration action schema & codegen --- + + +class TestCalibrateActionSchema: + """Tests for the CALIBRATE_ACTION_SCHEMA used by both calibration actions.""" + + def test_schema_accepts_on_success_key(self): + """on_success must be a recognised optional key.""" + schema_keys = {str(k) for k in CALIBRATE_ACTION_SCHEMA.schema} + assert CONF_ON_SUCCESS in schema_keys + + def test_schema_accepts_on_error_key(self): + """on_error must be a recognised optional key.""" + schema_keys = {str(k) for k in CALIBRATE_ACTION_SCHEMA.schema} + assert CONF_ON_ERROR in schema_keys + + +@pytest.fixture +def mock_codegen(): + """Mock cg and automation functions used by _build_calibrate_action.""" + mock_var = MagicMock() + mock_parent = MagicMock() + + with ( + patch( + "esphome.components.motion.cg.get_variable", + new_callable=AsyncMock, + return_value=mock_parent, + ) as mock_get_var, + patch( + "esphome.components.motion.cg.new_Pvariable", + return_value=mock_var, + ) as mock_new_pvar, + patch( + "esphome.components.motion.automation.build_automation", + new_callable=AsyncMock, + ) as mock_build_auto, + ): + yield { + "get_variable": mock_get_var, + "new_Pvariable": mock_new_pvar, + "build_automation": mock_build_auto, + "var": mock_var, + "parent": mock_parent, + } + + +@pytest.mark.asyncio +async def test_build_calibrate_action_no_triggers(mock_codegen): + """Without on_success/on_error, build_automation should not be called.""" + config = {CONF_ID: MagicMock()} + action_id = MagicMock() + template_arg = MagicMock() + + result = await _build_calibrate_action(config, action_id, template_arg, []) + + assert result is mock_codegen["var"] + mock_codegen["new_Pvariable"].assert_called_once_with( + action_id, template_arg, mock_codegen["parent"] + ) + mock_codegen["build_automation"].assert_not_called() + + +@pytest.mark.asyncio +async def test_build_calibrate_action_with_on_success(mock_codegen): + """on_success should wire build_automation to get_success_trigger().""" + on_success_config = MagicMock() + config = {CONF_ID: MagicMock(), CONF_ON_SUCCESS: on_success_config} + + await _build_calibrate_action(config, MagicMock(), MagicMock(), []) + + mock_codegen["build_automation"].assert_called_once_with( + mock_codegen["var"].get_success_trigger(), [], on_success_config + ) + + +@pytest.mark.asyncio +async def test_build_calibrate_action_with_on_error(mock_codegen): + """on_error should wire build_automation to get_error_trigger().""" + on_error_config = MagicMock() + config = {CONF_ID: MagicMock(), CONF_ON_ERROR: on_error_config} + + await _build_calibrate_action(config, MagicMock(), MagicMock(), []) + + mock_codegen["build_automation"].assert_called_once_with( + mock_codegen["var"].get_error_trigger(), [], on_error_config + ) + + +@pytest.mark.asyncio +async def test_build_calibrate_action_with_both_triggers(mock_codegen): + """Both on_success and on_error should each produce a build_automation call.""" + on_success_config = MagicMock() + on_error_config = MagicMock() + config = { + CONF_ID: MagicMock(), + CONF_ON_SUCCESS: on_success_config, + CONF_ON_ERROR: on_error_config, + } + + await _build_calibrate_action(config, MagicMock(), MagicMock(), []) + + assert mock_codegen["build_automation"].call_count == 2 + calls = mock_codegen["build_automation"].call_args_list + # First call: on_success + assert calls[0].args == ( + mock_codegen["var"].get_success_trigger(), + [], + on_success_config, + ) + # Second call: on_error + assert calls[1].args == ( + mock_codegen["var"].get_error_trigger(), + [], + on_error_config, + ) + + +# --- Clear calibration action --- + + +class TestClearActionSchema: + """Tests for CLEAR_ACTION_SCHEMA.""" + + def test_schema_has_save_key(self): + schema_keys = {str(k) for k in CLEAR_ACTION_SCHEMA.schema} + assert CONF_SAVE in schema_keys + + def test_save_defaults_to_false(self): + result = CLEAR_ACTION_SCHEMA({CONF_ID: "x"}) + assert result[CONF_SAVE] is False + + +@pytest.fixture +def mock_clear_codegen(): + """Mock cg functions used by clear_calibration_to_code.""" + mock_var = MagicMock() + mock_parent = MagicMock() + with ( + patch( + "esphome.components.motion.cg.get_variable", + new_callable=AsyncMock, + return_value=mock_parent, + ), + patch( + "esphome.components.motion.cg.new_Pvariable", + return_value=mock_var, + ) as mock_new_pvar, + patch("esphome.components.motion.cg.add") as mock_add, + ): + yield {"new_Pvariable": mock_new_pvar, "add": mock_add, "var": mock_var} + + +@pytest.mark.asyncio +async def test_clear_action_without_save(mock_clear_codegen): + """With save=False, set_save should not be emitted.""" + config = {CONF_ID: MagicMock(), CONF_SAVE: False} + result = await clear_calibration_to_code(config, MagicMock(), MagicMock(), []) + assert result is mock_clear_codegen["var"] + mock_clear_codegen["add"].assert_not_called() + + +@pytest.mark.asyncio +async def test_clear_action_with_save(mock_clear_codegen): + """With save=True, set_save(True) should be emitted exactly once.""" + config = {CONF_ID: MagicMock(), CONF_SAVE: True} + await clear_calibration_to_code(config, MagicMock(), MagicMock(), []) + mock_clear_codegen["var"].set_save.assert_called_once_with(True) + mock_clear_codegen["add"].assert_called_once() + + +# --- Calibration persistence invalidation --- +# +# The C++ side stores a hash of the build-time base matrix alongside the saved +# calibration so a changed axis_map invalidates stale NVS data without orphaning +# storage (the pref key stays ID-stable). These tests pin the design properties +# of that base-matrix fingerprint: deterministic for identical maps, distinct +# for different ones. + + +def _hash_matrix(matrix: list[float]) -> int: + """Python port of the C++ hash_matrix() (FNV-1a over the float bytes).""" + import struct + + data = struct.pack("<9f", *matrix) + h = 2166136261 + for b in data: + h ^= b + h = (h * 16777619) & 0xFFFFFFFF + return h + + +class TestBaseMatrixHash: + """Properties of the base-matrix fingerprint used for NVS invalidation.""" + + def test_identical_axis_maps_hash_equal(self): + a = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"}) + b = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"}) + assert _hash_matrix([float(v) for v in a]) == _hash_matrix( + [float(v) for v in b] + ) + + def test_different_axis_maps_hash_differ(self): + identity = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"}) + swapped = _axis_map_to_matrix({"x": "y", "y": "x", "z": "z"}) + assert _hash_matrix([float(v) for v in identity]) != _hash_matrix( + [float(v) for v in swapped] + ) + + def test_sign_change_hashes_differ(self): + pos = _axis_map_to_matrix({"x": "x", "y": "y", "z": "z"}) + neg = _axis_map_to_matrix({"x": "-x", "y": "y", "z": "z"}) + assert _hash_matrix([float(v) for v in pos]) != _hash_matrix( + [float(v) for v in neg] + ) + + +# --- Sensor config schema type validation --- + + +class TestSensorConfigSchema: + """Tests for sensor CONFIG_SCHEMA type key validation.""" + + def test_invalid_type_rejected(self): + with pytest.raises((Invalid, MultipleInvalid), match="Unknown value"): + CONFIG_SCHEMA({"type": "invalid_type"}) + + def test_missing_type_rejected(self): + with pytest.raises((Invalid, MultipleInvalid)): + CONFIG_SCHEMA({}) + + @pytest.mark.parametrize( + "sensor_type", + _ACCELERATIONS + _GYROSCOPES + _ANGULAR_RATES + [CONF_PITCH, CONF_ROLL], + ) + def test_valid_types_accepted(self, sensor_type): + """Valid sensor types should pass type validation (errors from missing + required fields like motion_id are expected and acceptable).""" + try: + CONFIG_SCHEMA({"type": sensor_type}) + except (Invalid, MultipleInvalid) as e: + # Should NOT be a type validation error + assert "Unknown value" not in str(e), ( + f"Type '{sensor_type}' was rejected as unknown" + )