[nrf52] allow to update OTA via cmd (#12344)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick+github@koston.org>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
tomaszduda23
2026-03-08 09:40:52 +01:00
committed by GitHub
parent a530aeec22
commit 2c705810cd
3 changed files with 194 additions and 9 deletions

View File

@@ -392,22 +392,42 @@ def _upload_using_platformio(
def upload_program(config: ConfigType, args, host: str) -> bool:
from esphome.__main__ import check_permissions, get_port_type
result = 0
handled = False
mcumgr_device: str | None = None
if get_port_type(host) == "SERIAL":
check_permissions(host)
result = _upload_using_platformio(config, host, ["-t", "upload"])
handled = True
if zephyr_data()[KEY_BOOTLOADER] == BOOTLOADER_MCUBOOT:
mcumgr_device = host
else:
result = _upload_using_platformio(config, host, ["-t", "upload"])
if result != 0:
raise EsphomeError(f"Upload failed with result: {result}")
return True # Handled: platformio serial upload
if host == "PYOCD":
result = _upload_using_platformio(config, host, ["-t", "flash_pyocd"])
handled = True
if result != 0:
raise EsphomeError(f"Upload failed with result: {result}")
return True # Handled: platformio PYOCD upload
if result != 0:
raise EsphomeError(f"Upload failed with result: {result}")
# Deferred imports: bleak/smpclient are heavy, only load for BLE/mcumgr paths
from .ble_logger import is_mac_address
from .ota import smpmgr_scan, smpmgr_upload
return handled
if host == "BLE":
mcumgr_device = asyncio.run(smpmgr_scan(CORE.name))
if is_mac_address(host):
mcumgr_device = host
if mcumgr_device:
firmware = Path(
CORE.relative_pioenvs_path(CORE.name, "zephyr", "app_update.bin")
).resolve()
asyncio.run(smpmgr_upload(mcumgr_device, firmware))
return True # Handled: mcumgr OTA upload
return False # Not handled: let caller try default upload methods
def show_logs(config: ConfigType, args, devices: list[str]) -> bool:
@@ -415,7 +435,7 @@ def show_logs(config: ConfigType, args, devices: list[str]) -> bool:
from .ble_logger import is_mac_address, logger_connect, logger_scan
if devices[0] == "BLE":
ble_device = asyncio.run(logger_scan(CORE.config["esphome"]["name"]))
ble_device = asyncio.run(logger_scan(CORE.name))
if ble_device:
address = ble_device.address
else:

View File

@@ -0,0 +1,164 @@
import asyncio
from dataclasses import asdict
import json
import logging
from pathlib import Path
from bleak import BleakScanner
from bleak.exc import BleakDeviceNotFoundError
from smp.exceptions import SMPBadStartDelimiter
from smpclient import SMPClient
from smpclient.generics import error, success
from smpclient.mcuboot import IMAGE_TLV, ImageInfo, MCUBootImageError, TLVNotFound
from smpclient.requests.image_management import ImageStatesRead, ImageStatesWrite
from smpclient.requests.os_management import ResetWrite
from smpclient.transport import SMPTransportDisconnected
from smpclient.transport.ble import (
SMPBLETransport,
SMPBLETransportDeviceNotFound,
SMPBLETransportException,
)
from smpclient.transport.serial import SMPSerialTransport
from esphome.core import EsphomeError
from esphome.espota2 import ProgressBar
from .ble_logger import is_mac_address
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
BLE_SCAN_TIMEOUT = 10.0 # seconds
RESET_DELAY = 2.0 # seconds to wait before reset, allows on_end action to execute
_LOGGER = logging.getLogger(__name__)
def _json_state(o: object) -> object:
"""JSON serializer for SMP image state objects."""
if isinstance(o, (bytes, bytearray)):
return o.hex()
if hasattr(o, "hex"):
return o.hex()
if hasattr(o, "__dict__"):
return vars(o)
return str(o)
async def smpmgr_scan(name: str) -> str:
_LOGGER.info("Scanning bluetooth for %s...", name)
for device in await BleakScanner.discover(
timeout=BLE_SCAN_TIMEOUT, service_uuids=[SMP_SERVICE_UUID]
):
if device.name == name:
return device.address
raise EsphomeError(f"BLE device {name} with OTA service not found")
async def smpmgr_upload(device: str, firmware: Path) -> None:
try:
await _smpmgr_upload(device, firmware)
except SMPTransportDisconnected as exc:
raise EsphomeError(f"{device} was disconnected.") from exc
except SMPBLETransportDeviceNotFound as exc:
raise EsphomeError(f"{device} was not found.") from exc
def _get_image_tlv_sha256(file: Path) -> bytes:
_LOGGER.info("Checking image: %s", str(file))
try:
image_info = ImageInfo.load_file(str(file))
_LOGGER.info(
"Image header:\n%s", json.dumps(asdict(image_info.header), indent=2)
)
_LOGGER.debug(str(image_info))
except MCUBootImageError as exc:
raise EsphomeError("Inspection of FW image failed") from exc
except FileNotFoundError as exc:
raise EsphomeError(
f"Firmware image file not found: {file}. Build with zephyr_mcumgr enabled"
) from exc
try:
image_tlv_sha256 = image_info.get_tlv(IMAGE_TLV.SHA256)
_LOGGER.info("Image tlv sha256: %s", image_tlv_sha256)
except TLVNotFound as exc:
raise EsphomeError("Could not find IMAGE_TLV_SHA256 in image.") from exc
return image_tlv_sha256.value
async def _smpmgr_upload(device: str, firmware: Path) -> None:
image_tlv_sha256 = _get_image_tlv_sha256(firmware)
if is_mac_address(device):
smp_client = SMPClient(SMPBLETransport(), device)
else:
smp_client = SMPClient(SMPSerialTransport(), device)
_LOGGER.info("Connecting %s...", device)
try:
await smp_client.connect()
except BleakDeviceNotFoundError as exc:
raise EsphomeError(f"Device {device} not found") from exc
except SMPBLETransportException as exc:
raise EsphomeError(f"Connection error with {device}") from exc
_LOGGER.info("Connected %s...", device)
try:
await _smpmgr_upload_connected(smp_client, device, firmware, image_tlv_sha256)
finally:
await smp_client.disconnect()
async def _smpmgr_upload_connected(
smp_client: SMPClient, device: str, firmware: Path, image_tlv_sha256: bytes
) -> None:
try:
image_state = await smp_client.request(ImageStatesRead(), 2.5)
except (SMPBadStartDelimiter, TimeoutError) as exc:
raise EsphomeError(f"mcumgr is not supported by device ({device})") from exc
already_uploaded = False
if error(image_state):
raise EsphomeError(f"Failed to read image state from {device}: {image_state}")
if success(image_state):
if len(image_state.images) == 0:
_LOGGER.warning("No images on device!")
for image in image_state.images:
_LOGGER.info(
"Image state:\n%s",
json.dumps(image, indent=2, default=_json_state),
)
if image.active and not image.confirmed:
raise EsphomeError("No free slot. Testing mode but not confirmed yet.")
if image.hash == image_tlv_sha256:
if already_uploaded:
raise EsphomeError("Both slots have the same image already")
if image.confirmed:
raise EsphomeError("The same image already confirmed")
_LOGGER.warning("The same image already uploaded")
already_uploaded = True
if not already_uploaded:
with open(firmware, "rb") as file:
image = file.read()
upload_size = len(image)
progress = ProgressBar()
progress.update(0)
try:
async for offset in smp_client.upload(image):
progress.update(offset / upload_size)
finally:
progress.done()
_LOGGER.info("Mark image for testing")
r = await smp_client.request(ImageStatesWrite(hash=image_tlv_sha256), 1.0)
if error(r):
raise EsphomeError(f"Failed to mark image for testing on {device}: {r}")
await asyncio.sleep(RESET_DELAY)
_LOGGER.info("Reset")
r = await smp_client.request(ResetWrite(), 1.0)
if error(r):
raise EsphomeError(f"Failed to reset {device}: {r}")

View File

@@ -23,6 +23,7 @@ resvg-py==0.2.6
freetype-py==2.5.1
jinja2==3.1.6
bleak==2.1.1
smpclient==6.0.0
requests==2.32.5
# esp-idf >= 5.0 requires this