[dashboard] Remove legacy web dashboard (#17124)

This commit is contained in:
Jesse Hills
2026-06-22 09:33:27 +12:00
committed by GitHub
parent c4abc5476e
commit d0e3e98d55
47 changed files with 109 additions and 6693 deletions

View File

@@ -41,7 +41,6 @@ function hasCoreChanges(changedFiles) {
*/ */
function hasDashboardChanges(changedFiles) { function hasDashboardChanges(changedFiles) {
return changedFiles.some(file => return changedFiles.some(file =>
file.startsWith('esphome/dashboard/') ||
file.startsWith('esphome/components/dashboard_import/') file.startsWith('esphome/components/dashboard_import/')
); );
} }

View File

@@ -1,119 +0,0 @@
name: Add Dashboard Deprecation Comment
on:
pull_request_target:
types: [opened, synchronize]
# All API calls (pulls.listFiles + issues.{list,create,update}Comment) are performed with
# the App token minted below, so the workflow's GITHUB_TOKEN does not need any scopes.
permissions: {}
jobs:
dashboard-deprecation-comment:
name: Dashboard deprecation comment
runs-on: ubuntu-latest
# Release-bump PRs (bump-X.Y.Z -> beta, beta -> release) inevitably
# roll up everything merged into dev since the last cut, which can
# include dashboard changes that have already been reviewed once.
# The bot's purpose is to warn new contributors before they invest
# time -- that only applies to PRs entering dev.
if: github.event.pull_request.base.ref == 'dev'
steps:
- name: Generate a token
id: generate-token
uses: actions/create-github-app-token@bcd2ba49218906704ab6c1aa796996da409d3eb1 # v3.2.0
with:
client-id: ${{ vars.ESPHOME_GITHUB_APP_CLIENT_ID }}
private-key: ${{ secrets.ESPHOME_GITHUB_APP_PRIVATE_KEY }}
# pulls.listFiles + issues.{list,create,update}Comment on PRs. For PR resources
# the issues.*Comment APIs require the pull-requests scope, not issues.
permission-pull-requests: write
- name: Add dashboard deprecation comment
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
with:
github-token: ${{ steps.generate-token.outputs.token }}
script: |
const commentMarker = "<!-- This comment was generated automatically by the dashboard-deprecation-comment workflow. -->";
const commentBody = `Thanks for opening this PR!
Heads up: the legacy ESPHome dashboard (\`esphome/dashboard/\` and \`tests/dashboard/\`) is **deprecated** and is being replaced by [ESPHome Device Builder](https://github.com/esphome/device-builder). We are not adding new features to the legacy dashboard and it will eventually be removed from this repository.
What this means for your PR:
- **New features / enhancements**: please port the change to [esphome/device-builder](https://github.com/esphome/device-builder) instead. We are unlikely to review or merge new dashboard features here.
- **Bug fixes**: small fixes may still be considered, but please check first whether the same issue exists in Device Builder, where the fix will have a longer life.
- **Security issues**: please do not file a public PR. Report privately via [GitHub security advisories](https://github.com/esphome/esphome/security/advisories/new) so we can coordinate a fix.
We appreciate the contribution and apologize for the friction; flagging this early so your time isn't spent on a change that may not land.
---
(Added by the PR bot)
${commentMarker}`;
async function getDashboardChanges(github, owner, repo, prNumber) {
const changedFiles = await github.paginate(
github.rest.pulls.listFiles,
{
owner: owner,
repo: repo,
pull_number: prNumber,
per_page: 100,
}
);
return changedFiles.filter(file =>
file.filename.startsWith('esphome/dashboard/') ||
file.filename.startsWith('tests/dashboard/')
);
}
async function findBotComment(github, owner, repo, prNumber) {
const comments = await github.paginate(
github.rest.issues.listComments,
{
owner: owner,
repo: repo,
issue_number: prNumber,
per_page: 100,
}
);
return comments.find(comment =>
comment.body.includes(commentMarker) && comment.user.type === "Bot"
);
}
const prNumber = context.payload.pull_request.number;
const { owner, repo } = context.repo;
const dashboardChanges = await getDashboardChanges(github, owner, repo, prNumber);
const existingComment = await findBotComment(github, owner, repo, prNumber);
if (dashboardChanges.length === 0) {
// PR doesn't (or no longer) touches the legacy dashboard. If we previously
// commented (e.g. files were removed in a later push), leave the comment in
// place for history rather than thrash on edit/delete.
return;
}
if (existingComment) {
if (existingComment.body === commentBody) {
return;
}
await github.rest.issues.updateComment({
owner: owner,
repo: repo,
comment_id: existingComment.id,
body: commentBody,
});
} else {
await github.rest.issues.createComment({
owner: owner,
repo: repo,
issue_number: prNumber,
body: commentBody,
});
}

View File

@@ -35,7 +35,6 @@ This document provides essential context for AI models interacting with this pro
2. **Code Generation** (`esphome/codegen.py`, `esphome/cpp_generator.py`): Manages Python to C++ code generation, template processing, and build flag management. 2. **Code Generation** (`esphome/codegen.py`, `esphome/cpp_generator.py`): Manages Python to C++ code generation, template processing, and build flag management.
3. **Component System** (`esphome/components/`): Contains modular hardware and software components with platform-specific implementations and dependency management. 3. **Component System** (`esphome/components/`): Contains modular hardware and software components with platform-specific implementations and dependency management.
4. **Core Framework** (`esphome/core/`): Manages the application lifecycle, hardware abstraction, and component registration. 4. **Core Framework** (`esphome/core/`): Manages the application lifecycle, hardware abstraction, and component registration.
5. **Dashboard** (`esphome/dashboard/`): A web-based interface for device configuration, management, and OTA updates.
* **Platform Support:** * **Platform Support:**
1. **ESP32** (`components/esp32/`): Espressif ESP32 family. Supports multiple variants (Original, C2, C3, C5, C6, H2, P4, S2, S3) with ESP-IDF framework. Arduino framework supports only a subset of the variants (Original, C3, S2, S3). 1. **ESP32** (`components/esp32/`): Espressif ESP32 family. Supports multiple variants (Original, C2, C3, C5, C6, H2, P4, S2, S3) with ESP-IDF framework. Arduino framework supports only a subset of the variants (Original, C3, S2, S3).
@@ -456,7 +455,6 @@ This document provides essential context for AI models interacting with this pro
* **Debug Tools:** * **Debug Tools:**
- `esphome config <file>.yaml` to validate configuration. - `esphome config <file>.yaml` to validate configuration.
- `esphome compile <file>.yaml` to compile without uploading. - `esphome compile <file>.yaml` to compile without uploading.
- Check the Dashboard for real-time logs.
- Use component-specific debug logging. - Use component-specific debug logging.
* **Common Issues:** * **Common Issues:**
- **Import Errors**: Check component dependencies and `PYTHONPATH`. - **Import Errors**: Check component dependencies and `PYTHONPATH`.
@@ -658,7 +656,7 @@ This document provides essential context for AI models interacting with this pro
If you need a real-world example, search for components that use `@dataclass` with `CORE.data` in the codebase. Note: Some components may use `TypedDict` for dictionary-based storage; both patterns are acceptable depending on your needs. If you need a real-world example, search for components that use `@dataclass` with `CORE.data` in the codebase. Note: Some components may use `TypedDict` for dictionary-based storage; both patterns are acceptable depending on your needs.
**Why this matters:** **Why this matters:**
- Module-level globals persist between compilation runs if the dashboard doesn't fork/exec - Module-level globals persist between compilation runs if the host process (e.g. device-builder) doesn't fork/exec
- `CORE.data` automatically clears between runs - `CORE.data` automatically clears between runs
- Namespacing under `DOMAIN` prevents key collisions between components - Namespacing under `DOMAIN` prevents key collisions between components
- `@dataclass` provides type safety and cleaner attribute access - `@dataclass` provides type safety and cleaner attribute access

View File

@@ -527,7 +527,7 @@ def has_resolvable_address() -> bool:
if has_ip_address(): if has_ip_address():
return True return True
# The dashboard pre-resolves the device and passes the IPs via # device-builder pre-resolves the device and passes the IPs via
# --mdns-address-cache/--dns-address-cache; honor a cached address even when the # --mdns-address-cache/--dns-address-cache; honor a cached address even when the
# device has mDNS disabled (e.g. a .local host found via ping). # device has mDNS disabled (e.g. a .local host found via ping).
if CORE.address_cache and CORE.address_cache.get_addresses(CORE.address): if CORE.address_cache and CORE.address_cache.get_addresses(CORE.address):
@@ -1715,9 +1715,13 @@ def command_bundle(args: ArgsProtocol, config: ConfigType) -> int | None:
def command_dashboard(args: ArgsProtocol) -> int | None: def command_dashboard(args: ArgsProtocol) -> int | None:
from esphome.dashboard import dashboard raise EsphomeError(
"The built-in dashboard has been removed from ESPHome. "
return dashboard.start_dashboard(args) "Install and run ESPHome Device Builder instead:\n"
" pip install esphome-device-builder\n"
" esphome-device-builder\n"
"See https://github.com/esphome/device-builder for more information."
)
def run_multiple_configs( def run_multiple_configs(
@@ -2379,44 +2383,22 @@ def parse_args(argv):
"configuration", help="Your YAML file or configuration directory.", nargs="*" "configuration", help="Your YAML file or configuration directory.", nargs="*"
) )
parser_dashboard = subparsers.add_parser( # The dashboard moved to ESPHome Device Builder; the command is kept only to
"dashboard", help="Create a simple web server for a dashboard." # print a redirect (see command_dashboard). Accept and ignore the old flags
# so legacy invocations reach that message instead of failing on argparse
# "unrecognized arguments".
parser_dashboard = subparsers.add_parser("dashboard")
parser_dashboard.add_argument("configuration", nargs="?", help=argparse.SUPPRESS)
parser_dashboard.add_argument("--port", help=argparse.SUPPRESS)
parser_dashboard.add_argument("--address", help=argparse.SUPPRESS)
parser_dashboard.add_argument("--username", help=argparse.SUPPRESS)
parser_dashboard.add_argument("--password", help=argparse.SUPPRESS)
parser_dashboard.add_argument("--socket", help=argparse.SUPPRESS)
parser_dashboard.add_argument(
"--open-ui", action="store_true", help=argparse.SUPPRESS
) )
parser_dashboard.add_argument( parser_dashboard.add_argument(
"configuration", help="Your YAML configuration file directory." "--ha-addon", action="store_true", help=argparse.SUPPRESS
)
parser_dashboard.add_argument(
"--port",
help="The HTTP port to open connections on. Defaults to 6052.",
type=int,
default=6052,
)
parser_dashboard.add_argument(
"--address",
help="The address to bind to.",
type=str,
default="0.0.0.0",
)
parser_dashboard.add_argument(
"--username",
help="The optional username to require for authentication.",
type=str,
default="",
)
parser_dashboard.add_argument(
"--password",
help="The optional password to require for authentication.",
type=str,
default="",
)
parser_dashboard.add_argument(
"--open-ui", help="Open the dashboard UI in a browser.", action="store_true"
)
parser_dashboard.add_argument(
"--ha-addon", help=argparse.SUPPRESS, action="store_true"
)
parser_dashboard.add_argument(
"--socket", help="Make the dashboard serve under a unix socket", type=str
) )
parser_vscode = subparsers.add_parser("vscode") parser_vscode = subparsers.add_parser("vscode")
@@ -2511,11 +2493,7 @@ def run_esphome(argv):
elif args.quiet: elif args.quiet:
args.log_level = "CRITICAL" args.log_level = "CRITICAL"
setup_log( setup_log(log_level=args.log_level)
log_level=args.log_level,
# Show timestamp for dashboard access logs
include_timestamp=args.command == "dashboard",
)
if args.command in PRE_CONFIG_ACTIONS: if args.command in PRE_CONFIG_ACTIONS:
try: try:

View File

@@ -92,7 +92,6 @@ def import_config(
"""Materialise a dashboard-imported device's YAML on disk. """Materialise a dashboard-imported device's YAML on disk.
Used by: Used by:
- esphome.dashboard (legacy dashboard)
- device-builder (esphome/device-builder) — called from the - device-builder (esphome/device-builder) — called from the
``devices/import`` WS handler to seed the YAML for an adopted ``devices/import`` WS handler to seed the YAML for an adopted
factory firmware. Coordinate before changing the kwargs or the factory firmware. Coordinate before changing the kwargs or the

View File

@@ -533,14 +533,12 @@ def get_board(core_obj=None):
def get_download_types(storage_json): def get_download_types(storage_json):
"""Binary-download entries for a built ESP32 firmware. """Binary-download entries for a built ESP32 firmware.
Used by: Used by device-builder (esphome/device-builder), via
- esphome.dashboard (legacy "Download .bin" button)
- device-builder (esphome/device-builder) — same dispatch via
``importlib.import_module(f"esphome.components.{platform}")`` ``importlib.import_module(f"esphome.components.{platform}")``
then ``module.get_download_types(storage)``. The contract is then ``module.get_download_types(storage)``. The contract is
"returns ``list[dict]`` with at least ``title`` / "returns ``list[dict]`` with at least ``title`` /
``description`` / ``file`` / ``download`` keys"; please keep ``description`` / ``file`` / ``download`` keys"; please keep
the shape stable so the new dashboard's download panel the shape stable so the download panel
doesn't have to special-case per-platform schemas. doesn't have to special-case per-platform schemas.
""" """
return [ return [

View File

@@ -97,14 +97,12 @@ def set_core_data(config):
def get_download_types(storage_json): def get_download_types(storage_json):
"""Binary-download entries for a built ESP8266 firmware. """Binary-download entries for a built ESP8266 firmware.
Used by: Used by device-builder (esphome/device-builder), via
- esphome.dashboard (legacy "Download .bin" button)
- device-builder (esphome/device-builder) — same dispatch via
``importlib.import_module(f"esphome.components.{platform}")`` ``importlib.import_module(f"esphome.components.{platform}")``
then ``module.get_download_types(storage)``. The contract is then ``module.get_download_types(storage)``. The contract is
"returns ``list[dict]`` with at least ``title`` / "returns ``list[dict]`` with at least ``title`` /
``description`` / ``file`` / ``download`` keys"; please keep ``description`` / ``file`` / ``download`` keys"; please keep
the shape stable so the new dashboard's download panel the shape stable so the download panel
doesn't have to special-case per-platform schemas. doesn't have to special-case per-platform schemas.
""" """
return [ return [

View File

@@ -158,14 +158,12 @@ def only_on_family(*, supported=None, unsupported=None):
def get_download_types(storage_json: StorageJSON = None): def get_download_types(storage_json: StorageJSON = None):
"""Binary-download entries for a built LibreTiny firmware. """Binary-download entries for a built LibreTiny firmware.
Used by: Used by device-builder (esphome/device-builder), via
- esphome.dashboard (legacy "Download .bin" button)
- device-builder (esphome/device-builder) — same dispatch via
``importlib.import_module(f"esphome.components.{platform}")`` ``importlib.import_module(f"esphome.components.{platform}")``
then ``module.get_download_types(storage)``. The contract is then ``module.get_download_types(storage)``. The contract is
"returns ``list[dict]`` with at least ``title`` / "returns ``list[dict]`` with at least ``title`` /
``description`` / ``file`` / ``download`` keys"; please keep ``description`` / ``file`` / ``download`` keys"; please keep
the shape stable so the new dashboard's download panel the shape stable so the download panel
doesn't have to special-case per-platform schemas. doesn't have to special-case per-platform schemas.
""" """
types = [ types = [

View File

@@ -140,14 +140,12 @@ def only_on_variant(
def get_download_types(storage_json): def get_download_types(storage_json):
"""Binary-download entries for a built RP2040 firmware. """Binary-download entries for a built RP2040 firmware.
Used by: Used by device-builder (esphome/device-builder), via
- esphome.dashboard (legacy "Download .bin" button)
- device-builder (esphome/device-builder) — same dispatch via
``importlib.import_module(f"esphome.components.{platform}")`` ``importlib.import_module(f"esphome.components.{platform}")``
then ``module.get_download_types(storage)``. The contract is then ``module.get_download_types(storage)``. The contract is
"returns ``list[dict]`` with at least ``title`` / "returns ``list[dict]`` with at least ``title`` /
``description`` / ``file`` / ``download`` keys"; please keep ``description`` / ``file`` / ``download`` keys"; please keep
the shape stable so the new dashboard's download panel the shape stable so the download panel
doesn't have to special-case per-platform schemas. doesn't have to special-case per-platform schemas.
""" """
return [ return [

View File

@@ -1,32 +0,0 @@
from __future__ import annotations
import sys
from esphome.enum import StrEnum
class DashboardEvent(StrEnum):
"""Dashboard WebSocket event types."""
# Server -> Client events (backend sends to frontend)
ENTRY_ADDED = "entry_added"
ENTRY_REMOVED = "entry_removed"
ENTRY_UPDATED = "entry_updated"
ENTRY_STATE_CHANGED = "entry_state_changed"
IMPORTABLE_DEVICE_ADDED = "importable_device_added"
IMPORTABLE_DEVICE_REMOVED = "importable_device_removed"
INITIAL_STATE = "initial_state" # Sent on WebSocket connection
PONG = "pong" # Response to client ping
# Client -> Server events (frontend sends to backend)
PING = "ping" # WebSocket keepalive from client
REFRESH = "refresh" # Force backend to poll for changes
MAX_EXECUTOR_WORKERS = 48
SENTINEL = object()
ESPHOME_COMMAND = [sys.executable, "-m", "esphome"]
DASHBOARD_COMMAND = [*ESPHOME_COMMAND, "--dashboard"]

View File

@@ -1,190 +0,0 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
import contextlib
from dataclasses import dataclass
from functools import partial
import json
import logging
import threading
from typing import Any
from esphome.storage_json import ignored_devices_storage_path
from ..zeroconf import DiscoveredImport
from .const import DashboardEvent
from .dns import DNSCache
from .entries import DashboardEntries
from .settings import DashboardSettings
from .status.mdns import MDNSStatus
from .status.ping import PingStatus
_LOGGER = logging.getLogger(__name__)
IGNORED_DEVICES_STORAGE_PATH = "ignored-devices.json"
MDNS_BOOTSTRAP_TIME = 7.5
@dataclass
class Event:
"""Dashboard Event."""
event_type: DashboardEvent
data: dict[str, Any]
class EventBus:
"""Dashboard event bus."""
def __init__(self) -> None:
"""Initialize the Dashboard event bus."""
self._listeners: dict[DashboardEvent, set[Callable[[Event], None]]] = {}
def async_add_listener(
self, event_type: DashboardEvent, listener: Callable[[Event], None]
) -> Callable[[], None]:
"""Add a listener to the event bus."""
self._listeners.setdefault(event_type, set()).add(listener)
return partial(self._async_remove_listener, event_type, listener)
def _async_remove_listener(
self, event_type: DashboardEvent, listener: Callable[[Event], None]
) -> None:
"""Remove a listener from the event bus."""
self._listeners[event_type].discard(listener)
def async_fire(
self, event_type: DashboardEvent, event_data: dict[str, Any]
) -> None:
"""Fire an event."""
event = Event(event_type, event_data)
_LOGGER.debug("Firing event: %s", event)
for listener in self._listeners.get(event_type, set()):
listener(event)
class ESPHomeDashboard:
"""Class that represents the dashboard."""
__slots__ = (
"bus",
"entries",
"loop",
"import_result",
"stop_event",
"ping_request",
"mqtt_ping_request",
"mdns_status",
"settings",
"dns_cache",
"_background_tasks",
"ignored_devices",
"_ping_status_task",
)
def __init__(self) -> None:
"""Initialize the ESPHomeDashboard."""
self.bus = EventBus()
self.entries: DashboardEntries | None = None
self.loop: asyncio.AbstractEventLoop | None = None
self.import_result: dict[str, DiscoveredImport] = {}
self.stop_event = threading.Event()
self.ping_request: asyncio.Event | None = None
self.mqtt_ping_request = threading.Event()
self.mdns_status: MDNSStatus | None = None
self.settings = DashboardSettings()
self.dns_cache = DNSCache()
self._background_tasks: set[asyncio.Task] = set()
self.ignored_devices: set[str] = set()
self._ping_status_task: asyncio.Task | None = None
async def async_setup(self) -> None:
"""Setup the dashboard."""
self.loop = asyncio.get_running_loop()
self.ping_request = asyncio.Event()
self.entries = DashboardEntries(self)
await self.loop.run_in_executor(None, self.load_ignored_devices)
def load_ignored_devices(self) -> None:
storage_path = ignored_devices_storage_path()
try:
with storage_path.open("r", encoding="utf-8") as f_handle:
data = json.load(f_handle)
self.ignored_devices = set(data.get("ignored_devices", set()))
except FileNotFoundError:
pass
def save_ignored_devices(self) -> None:
storage_path = ignored_devices_storage_path()
with storage_path.open("w", encoding="utf-8") as f_handle:
json.dump(
{"ignored_devices": sorted(self.ignored_devices)}, indent=2, fp=f_handle
)
def _async_start_ping_status(self, ping_status: PingStatus) -> None:
self._ping_status_task = asyncio.create_task(ping_status.async_run())
async def async_run(self) -> None:
"""Run the dashboard."""
settings = self.settings
mdns_task: asyncio.Task | None = None
await self.entries.async_update_entries()
mdns_status = MDNSStatus(self)
ping_status = PingStatus(self)
start_ping_timer: asyncio.TimerHandle | None = None
self.mdns_status = mdns_status
if mdns_status.async_setup():
mdns_task = asyncio.create_task(mdns_status.async_run())
# Start ping MDNS_BOOTSTRAP_TIME seconds after startup to ensure
# MDNS has had a chance to resolve the devices
start_ping_timer = self.loop.call_later(
MDNS_BOOTSTRAP_TIME, self._async_start_ping_status, ping_status
)
else:
# If mDNS is not available, start the ping status immediately
self._async_start_ping_status(ping_status)
if settings.status_use_mqtt:
from .status.mqtt import MqttStatusThread
status_thread_mqtt = MqttStatusThread(self)
status_thread_mqtt.start()
try:
await asyncio.Event().wait()
finally:
_LOGGER.info("Shutting down...")
self.stop_event.set()
self.ping_request.set()
if start_ping_timer:
start_ping_timer.cancel()
if self._ping_status_task:
self._ping_status_task.cancel()
self._ping_status_task = None
if mdns_task:
mdns_task.cancel()
if settings.status_use_mqtt:
status_thread_mqtt.join()
self.mqtt_ping_request.set()
for task in self._background_tasks:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
await asyncio.sleep(0)
def async_create_background_task(
self, coro: Coroutine[Any, Any, Any]
) -> asyncio.Task:
"""Create a background task."""
task = self.loop.create_task(coro)
task.add_done_callback(self._background_tasks.discard)
return task
DASHBOARD = ESPHomeDashboard()

View File

@@ -1,153 +0,0 @@
from __future__ import annotations
import asyncio
from asyncio import events
from concurrent.futures import ThreadPoolExecutor
import contextlib
import logging
import os
from pathlib import Path
import socket
import threading
from time import monotonic
import traceback
from typing import Any
from esphome.storage_json import EsphomeStorageJSON, esphome_storage_path
from .const import MAX_EXECUTOR_WORKERS
from .core import DASHBOARD
from .web_server import make_app, start_web_server
ENV_DEV = "ESPHOME_DASHBOARD_DEV"
settings = DASHBOARD.settings
def can_use_pidfd() -> bool:
"""Check if pidfd_open is available.
Back ported from cpython 3.12
"""
if not hasattr(os, "pidfd_open"):
return False
try:
pid = os.getpid()
os.close(os.pidfd_open(pid, 0))
except OSError:
# blocked by security policy like SECCOMP
return False
return True
class DashboardEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
"""Event loop policy for Home Assistant."""
def __init__(self, debug: bool) -> None:
"""Init the event loop policy."""
super().__init__()
self.debug = debug
self._watcher: asyncio.AbstractChildWatcher | None = None
def _init_watcher(self) -> None:
"""Initialize the watcher for child processes.
Back ported from cpython 3.12
"""
with events._lock: # type: ignore[attr-defined] # pylint: disable=protected-access
if self._watcher is None: # pragma: no branch
if can_use_pidfd():
self._watcher = asyncio.PidfdChildWatcher()
else:
self._watcher = asyncio.ThreadedChildWatcher()
if threading.current_thread() is threading.main_thread():
self._watcher.attach_loop(
self._local._loop # type: ignore[attr-defined] # pylint: disable=protected-access
)
@property
def loop_name(self) -> str:
"""Return name of the loop."""
return self._loop_factory.__name__ # type: ignore[no-any-return,attr-defined]
def new_event_loop(self) -> asyncio.AbstractEventLoop:
"""Get the event loop."""
loop: asyncio.AbstractEventLoop = super().new_event_loop()
loop.set_exception_handler(_async_loop_exception_handler)
if self.debug:
loop.set_debug(True)
executor = ThreadPoolExecutor(
thread_name_prefix="SyncWorker", max_workers=MAX_EXECUTOR_WORKERS
)
loop.set_default_executor(executor)
# bind the built-in time.monotonic directly as loop.time to avoid the
# overhead of the additional method call since its the most called loop
# method and its roughly 10%+ of all the call time in base_events.py
loop.time = monotonic # type: ignore[method-assign]
return loop
def _async_loop_exception_handler(_: Any, context: dict[str, Any]) -> None:
"""Handle all exception inside the core loop."""
kwargs = {}
if exception := context.get("exception"):
kwargs["exc_info"] = (type(exception), exception, exception.__traceback__)
logger = logging.getLogger(__package__)
if source_traceback := context.get("source_traceback"):
stack_summary = "".join(traceback.format_list(source_traceback))
logger.error(
"Error doing job: %s: %s",
context["message"],
stack_summary,
**kwargs, # type: ignore[arg-type]
)
return
logger.error(
"Error doing job: %s",
context["message"],
**kwargs, # type: ignore[arg-type]
)
def start_dashboard(args) -> None:
"""Start the dashboard."""
settings.parse_args(args)
if settings.using_auth:
path = esphome_storage_path()
storage = EsphomeStorageJSON.load(path)
if storage is None:
storage = EsphomeStorageJSON.get_default()
storage.save(path)
settings.cookie_secret = storage.cookie_secret
asyncio.set_event_loop_policy(DashboardEventLoopPolicy(settings.verbose))
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(async_start(args))
async def async_start(args) -> None:
"""Start the dashboard."""
dashboard = DASHBOARD
await dashboard.async_setup()
sock: socket.socket | None = args.socket
address: str | None = args.address
port: int | None = args.port
start_web_server(make_app(args.verbose), sock, address, port, settings.config_dir)
if args.open_ui:
import webbrowser
webbrowser.open(f"http://{args.address}:{args.port}")
try:
await dashboard.async_run()
finally:
if sock:
Path(sock).unlink()

View File

@@ -1,77 +0,0 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from ipaddress import ip_address
import logging
from icmplib import NameLookupError, async_resolve
RESOLVE_TIMEOUT = 3.0
_LOGGER = logging.getLogger(__name__)
_RESOLVE_EXCEPTIONS = (TimeoutError, NameLookupError, UnicodeError)
async def _async_resolve_wrapper(hostname: str) -> list[str] | Exception:
"""Wrap the icmplib async_resolve function."""
with suppress(ValueError):
return [str(ip_address(hostname))]
try:
async with asyncio.timeout(RESOLVE_TIMEOUT):
return await async_resolve(hostname)
except _RESOLVE_EXCEPTIONS as ex:
# If the hostname ends with .local and resolution failed,
# try the bare hostname as a fallback since mDNS may not be
# working on the system but unicast DNS might resolve it
if hostname.endswith(".local"):
bare_hostname = hostname[:-6] # Remove ".local"
try:
async with asyncio.timeout(RESOLVE_TIMEOUT):
result = await async_resolve(bare_hostname)
_LOGGER.debug(
"Bare hostname %s resolved to %s", bare_hostname, result
)
return result
except _RESOLVE_EXCEPTIONS:
_LOGGER.debug("Bare hostname %s also failed to resolve", bare_hostname)
return ex
class DNSCache:
"""DNS cache for the dashboard."""
def __init__(self, ttl: int | None = 120) -> None:
"""Initialize the DNSCache."""
self._cache: dict[str, tuple[float, list[str] | Exception]] = {}
self._ttl = ttl
def get_cached_addresses(
self, hostname: str, now_monotonic: float
) -> list[str] | None:
"""Get cached addresses without triggering resolution.
Returns None if not in cache, list of addresses if found.
"""
# Normalize hostname for consistent lookups
normalized = hostname.rstrip(".").lower()
if expire_time_addresses := self._cache.get(normalized):
expire_time, addresses = expire_time_addresses
if expire_time > now_monotonic and not isinstance(addresses, Exception):
return addresses
return None
async def async_resolve(
self, hostname: str, now_monotonic: float
) -> list[str] | Exception:
"""Resolve a hostname to a list of IP address."""
if expire_time_addresses := self._cache.get(hostname):
expire_time, addresses = expire_time_addresses
if expire_time > now_monotonic:
return addresses
expires = now_monotonic + self._ttl
addresses = await _async_resolve_wrapper(hostname)
self._cache[hostname] = (expires, addresses)
return addresses

View File

@@ -1,458 +0,0 @@
from __future__ import annotations
import asyncio
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
from esphome import const, util
from esphome.enum import StrEnum
from esphome.storage_json import StorageJSON, ext_storage_path
from .const import DASHBOARD_COMMAND, DashboardEvent
from .util.subprocess import async_run_system_command
if TYPE_CHECKING:
from .core import ESPHomeDashboard
_LOGGER = logging.getLogger(__name__)
DashboardCacheKeyType = tuple[int, int, float, int]
@dataclass(frozen=True)
class EntryState:
"""Represents the state of an entry."""
reachable: ReachableState
source: EntryStateSource
class EntryStateSource(StrEnum):
MDNS = "mdns"
PING = "ping"
MQTT = "mqtt"
UNKNOWN = "unknown"
class ReachableState(StrEnum):
ONLINE = "online"
OFFLINE = "offline"
DNS_FAILURE = "dns_failure"
UNKNOWN = "unknown"
_BOOL_TO_REACHABLE_STATE = {
True: ReachableState.ONLINE,
False: ReachableState.OFFLINE,
None: ReachableState.UNKNOWN,
}
_REACHABLE_STATE_TO_BOOL = {
ReachableState.ONLINE: True,
ReachableState.OFFLINE: False,
ReachableState.DNS_FAILURE: False,
ReachableState.UNKNOWN: None,
}
UNKNOWN_STATE = EntryState(ReachableState.UNKNOWN, EntryStateSource.UNKNOWN)
@lru_cache # creating frozen dataclass instances is expensive, so we cache them
def bool_to_entry_state(value: bool | None, source: EntryStateSource) -> EntryState:
"""Convert a bool to an entry state."""
return EntryState(_BOOL_TO_REACHABLE_STATE[value], source)
def entry_state_to_bool(value: EntryState) -> bool | None:
"""Convert an entry state to a bool."""
return _REACHABLE_STATE_TO_BOOL[value.reachable]
class DashboardEntries:
"""Represents all dashboard entries."""
__slots__ = (
"_dashboard",
"_loop",
"_config_dir",
"_entries",
"_entry_states",
"_loaded_entries",
"_update_lock",
"_name_to_entry",
)
def __init__(self, dashboard: ESPHomeDashboard) -> None:
"""Initialize the DashboardEntries."""
self._dashboard = dashboard
self._loop = asyncio.get_running_loop()
self._config_dir = dashboard.settings.config_dir
# Entries are stored as
# {
# "path/to/file.yaml": DashboardEntry,
# ...
# }
self._entries: dict[Path, DashboardEntry] = {}
self._loaded_entries = False
self._update_lock = asyncio.Lock()
self._name_to_entry: dict[str, set[DashboardEntry]] = defaultdict(set)
def get(self, path: Path) -> DashboardEntry | None:
"""Get an entry by path."""
return self._entries.get(path)
def get_by_name(self, name: str) -> set[DashboardEntry] | None:
"""Get an entry by name."""
return self._name_to_entry.get(name)
async def _async_all(self) -> list[DashboardEntry]:
"""Return all entries."""
return list(self._entries.values())
def all(self) -> list[DashboardEntry]:
"""Return all entries."""
return asyncio.run_coroutine_threadsafe(self._async_all(), self._loop).result()
def async_all(self) -> list[DashboardEntry]:
"""Return all entries."""
return list(self._entries.values())
def set_state(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry."""
asyncio.run_coroutine_threadsafe(
self._async_set_state(entry, state), self._loop
).result()
async def _async_set_state(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry."""
self.async_set_state(entry, state)
def set_state_if_online_or_source(
self, entry: DashboardEntry, state: EntryState
) -> None:
"""Set the state for an entry if its online or provided by the source or unknown."""
asyncio.run_coroutine_threadsafe(
self._async_set_state_if_online_or_source(entry, state), self._loop
).result()
async def _async_set_state_if_online_or_source(
self, entry: DashboardEntry, state: EntryState
) -> None:
"""Set the state for an entry if its online or provided by the source or unknown."""
self.async_set_state_if_online_or_source(entry, state)
def async_set_state_if_online_or_source(
self, entry: DashboardEntry, state: EntryState
) -> None:
"""Set the state for an entry if its online or provided by the source or unknown."""
if (
state.reachable is ReachableState.ONLINE
and entry.state.reachable is not ReachableState.ONLINE
) or entry.state.source in (
EntryStateSource.UNKNOWN,
state.source,
):
self.async_set_state(entry, state)
def set_state_if_source(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry if provided by the source or unknown."""
asyncio.run_coroutine_threadsafe(
self._async_set_state_if_source(entry, state), self._loop
).result()
async def _async_set_state_if_source(
self, entry: DashboardEntry, state: EntryState
) -> None:
"""Set the state for an entry if rovided by the source or unknown."""
self.async_set_state_if_source(entry, state)
def async_set_state_if_source(
self, entry: DashboardEntry, state: EntryState
) -> None:
"""Set the state for an entry if provided by the source or unknown."""
if entry.state.source in (
EntryStateSource.UNKNOWN,
state.source,
):
self.async_set_state(entry, state)
def async_set_state(self, entry: DashboardEntry, state: EntryState) -> None:
"""Set the state for an entry."""
if entry.state == state:
return
entry.state = state
self._dashboard.bus.async_fire(
DashboardEvent.ENTRY_STATE_CHANGED, {"entry": entry, "state": state}
)
async def async_request_update_entries(self) -> None:
"""Request an update of the dashboard entries from disk.
If an update is already in progress, this will do nothing.
"""
if self._update_lock.locked():
_LOGGER.debug("Dashboard entries are already being updated")
return
await self.async_update_entries()
async def async_update_entries(self) -> None:
"""Update the dashboard entries from disk."""
async with self._update_lock:
await self._async_update_entries()
def _load_entries(
self, entries: dict[DashboardEntry, DashboardCacheKeyType]
) -> None:
"""Load all entries from disk."""
for entry, cache_key in entries.items():
_LOGGER.debug(
"Loading dashboard entry %s because cache key changed: %s",
entry.path,
cache_key,
)
entry.load_from_disk(cache_key)
async def _async_update_entries(self) -> list[DashboardEntry]:
"""Sync the dashboard entries from disk."""
_LOGGER.debug("Updating dashboard entries")
# At some point it would be nice to use watchdog to avoid polling
path_to_cache_key = await self._loop.run_in_executor(
None, self._get_path_to_cache_key
)
entries = self._entries
name_to_entry = self._name_to_entry
added: dict[DashboardEntry, DashboardCacheKeyType] = {}
updated: dict[DashboardEntry, DashboardCacheKeyType] = {}
removed: set[DashboardEntry] = {
entry
for filename, entry in entries.items()
if filename not in path_to_cache_key
}
original_names: dict[DashboardEntry, str] = {}
for path, cache_key in path_to_cache_key.items():
if not (entry := entries.get(path)):
entry = DashboardEntry(path, cache_key)
added[entry] = cache_key
continue
if entry.cache_key != cache_key:
updated[entry] = cache_key
original_names[entry] = entry.name
if added or updated:
await self._loop.run_in_executor(
None, self._load_entries, {**added, **updated}
)
bus = self._dashboard.bus
for entry in added:
entries[entry.path] = entry
name_to_entry[entry.name].add(entry)
bus.async_fire(DashboardEvent.ENTRY_ADDED, {"entry": entry})
for entry in removed:
del entries[entry.path]
name_to_entry[entry.name].discard(entry)
bus.async_fire(DashboardEvent.ENTRY_REMOVED, {"entry": entry})
for entry in updated:
if (original_name := original_names[entry]) != (current_name := entry.name):
name_to_entry[original_name].discard(entry)
name_to_entry[current_name].add(entry)
bus.async_fire(DashboardEvent.ENTRY_UPDATED, {"entry": entry})
def _get_path_to_cache_key(self) -> dict[Path, DashboardCacheKeyType]:
"""Return a dict of path to cache key."""
path_to_cache_key: dict[Path, DashboardCacheKeyType] = {}
#
# The cache key is (inode, device, mtime, size)
# which allows us to avoid locking since it ensures
# every iteration of this call will always return the newest
# items from disk at the cost of a stat() call on each
# file which is much faster than reading the file
# for the cache hit case which is the common case.
#
for file in util.list_yaml_files([self._config_dir]):
try:
# Prefer the json storage path if it exists
stat = ext_storage_path(file.name).stat()
except OSError:
try:
# Fallback to the yaml file if the storage
# file does not exist or could not be generated
stat = file.stat()
except OSError:
# File was deleted, ignore
continue
path_to_cache_key[file] = (
stat.st_ino,
stat.st_dev,
stat.st_mtime,
stat.st_size,
)
return path_to_cache_key
def async_schedule_storage_json_update(self, filename: str) -> None:
"""Schedule a task to update the storage JSON file."""
self._dashboard.async_create_background_task(
async_run_system_command(
[*DASHBOARD_COMMAND, "compile", "--only-generate", filename]
)
)
class DashboardEntry:
"""Represents a single dashboard entry.
This class is thread-safe and read-only.
"""
__slots__ = (
"path",
"filename",
"_storage_path",
"cache_key",
"storage",
"state",
"_to_dict",
)
def __init__(self, path: Path, cache_key: DashboardCacheKeyType) -> None:
"""Initialize the DashboardEntry."""
self.path = path
self.filename: str = path.name
self._storage_path = ext_storage_path(self.filename)
self.cache_key = cache_key
self.storage: StorageJSON | None = None
self.state = UNKNOWN_STATE
self._to_dict: dict[str, Any] | None = None
def __repr__(self) -> str:
"""Return the representation of this entry."""
return (
f"DashboardEntry(path={self.path} "
f"address={self.address} "
f"web_port={self.web_port} "
f"name={self.name} "
f"no_mdns={self.no_mdns} "
f"state={self.state} "
")"
)
def to_dict(self) -> dict[str, Any]:
"""Return a dict representation of this entry.
The dict includes the loaded configuration but not
the current state of the entry.
"""
if self._to_dict is None:
self._to_dict = {
"name": self.name,
"friendly_name": self.friendly_name,
"configuration": self.filename,
"loaded_integrations": sorted(self.loaded_integrations),
"deployed_version": self.update_old,
"current_version": self.update_new,
"path": str(self.path),
"comment": self.comment,
"address": self.address,
"web_port": self.web_port,
"target_platform": self.target_platform,
}
return self._to_dict
def load_from_disk(self, cache_key: DashboardCacheKeyType | None = None) -> None:
"""Load this entry from disk."""
self.storage = StorageJSON.load(self._storage_path)
self._to_dict = None
#
# Currently StorageJSON.load() will return None if the file does not exist
#
# StorageJSON currently does not provide an updated cache key so we use the
# one that is passed in.
#
# The cache key was read from the disk moments ago and may be stale but
# it does not matter since we are polling anyways, and the next call to
# async_update_entries() will load it again in the extremely rare case that
# it changed between the two calls.
#
if cache_key:
self.cache_key = cache_key
@property
def address(self) -> str | None:
"""Return the address of this entry."""
if self.storage is None:
return None
return self.storage.address
@property
def no_mdns(self) -> bool | None:
"""Return the no_mdns of this entry."""
if self.storage is None:
return None
return self.storage.no_mdns
@property
def web_port(self) -> int | None:
"""Return the web port of this entry."""
if self.storage is None:
return None
return self.storage.web_port
@property
def name(self) -> str:
"""Return the name of this entry."""
if self.storage is None:
return self.filename.replace(".yml", "").replace(".yaml", "")
return self.storage.name
@property
def friendly_name(self) -> str:
"""Return the friendly name of this entry."""
if self.storage is None:
return self.name
return self.storage.friendly_name
@property
def comment(self) -> str | None:
"""Return the comment of this entry."""
if self.storage is None:
return None
return self.storage.comment
@property
def target_platform(self) -> str | None:
"""Return the target platform of this entry."""
if self.storage is None:
return None
return self.storage.target_platform
@property
def update_available(self) -> bool:
"""Return if an update is available for this entry."""
if self.storage is None:
return True
return self.update_old != self.update_new
@property
def update_old(self) -> str:
if self.storage is None:
return ""
return self.storage.esphome_version or ""
@property
def update_new(self) -> str:
return const.__version__
@property
def loaded_integrations(self) -> set[str]:
if self.storage is None:
return []
return self.storage.loaded_integrations

View File

@@ -1,76 +0,0 @@
"""Data models and builders for the dashboard."""
from __future__ import annotations
from typing import TYPE_CHECKING, TypedDict
if TYPE_CHECKING:
from esphome.zeroconf import DiscoveredImport
from .core import ESPHomeDashboard
from .entries import DashboardEntry
class ImportableDeviceDict(TypedDict):
"""Dictionary representation of an importable device."""
name: str
friendly_name: str | None
package_import_url: str
project_name: str
project_version: str
network: str
ignored: bool
class ConfiguredDeviceDict(TypedDict, total=False):
"""Dictionary representation of a configured device."""
name: str
friendly_name: str | None
configuration: str
loaded_integrations: list[str] | None
deployed_version: str | None
current_version: str | None
path: str
comment: str | None
address: str | None
web_port: int | None
target_platform: str | None
class DeviceListResponse(TypedDict):
"""Response for device list API."""
configured: list[ConfiguredDeviceDict]
importable: list[ImportableDeviceDict]
def build_importable_device_dict(
dashboard: ESPHomeDashboard, discovered: DiscoveredImport
) -> ImportableDeviceDict:
"""Build the importable device dictionary."""
return ImportableDeviceDict(
name=discovered.device_name,
friendly_name=discovered.friendly_name,
package_import_url=discovered.package_import_url,
project_name=discovered.project_name,
project_version=discovered.project_version,
network=discovered.network,
ignored=discovered.device_name in dashboard.ignored_devices,
)
def build_device_list_response(
dashboard: ESPHomeDashboard, entries: list[DashboardEntry]
) -> DeviceListResponse:
"""Build the device list response data."""
configured = {entry.name for entry in entries}
return DeviceListResponse(
configured=[entry.to_dict() for entry in entries],
importable=[
build_importable_device_dict(dashboard, res)
for res in dashboard.import_result.values()
if res.device_name not in configured
],
)

View File

@@ -1,101 +0,0 @@
from __future__ import annotations
import hmac
import os
from pathlib import Path
from typing import Any
from esphome.core import CORE
from esphome.helpers import get_bool_env
from .util.password import password_hash
# Sentinel file name used for CORE.config_path when dashboard initializes.
# This ensures .parent returns the config directory instead of root.
_DASHBOARD_SENTINEL_FILE = "___DASHBOARD_SENTINEL___.yaml"
class DashboardSettings:
"""Settings for the dashboard."""
__slots__ = (
"config_dir",
"password_hash",
"username",
"using_password",
"on_ha_addon",
"cookie_secret",
"absolute_config_dir",
"verbose",
)
def __init__(self) -> None:
"""Initialize the dashboard settings."""
self.config_dir: Path = None
self.password_hash: bytes = b""
self.username: str = ""
self.using_password: bool = False
self.on_ha_addon: bool = False
self.cookie_secret: str | None = None
self.absolute_config_dir: Path | None = None
self.verbose: bool = False
def parse_args(self, args: Any) -> None:
"""Parse the arguments."""
self.on_ha_addon: bool = args.ha_addon
password = args.password or os.getenv("PASSWORD") or ""
if not self.on_ha_addon:
self.username = args.username or os.getenv("USERNAME") or ""
self.using_password = bool(password)
if self.using_password:
self.password_hash = password_hash(password)
self.config_dir = Path(args.configuration)
self.absolute_config_dir = self.config_dir.resolve()
self.verbose = args.verbose
# Set to a sentinel file so .parent gives us the config directory.
# Previously this was `os.path.join(self.config_dir, ".")` which worked because
# os.path.dirname("/config/.") returns "/config", but Path("/config/.").parent
# normalizes to Path("/config") first, then .parent returns Path("/"), breaking
# secret resolution. Using a sentinel file ensures .parent gives the correct directory.
CORE.config_path = self.config_dir / _DASHBOARD_SENTINEL_FILE
@property
def relative_url(self) -> str:
return os.getenv("ESPHOME_DASHBOARD_RELATIVE_URL") or "/"
@property
def status_use_mqtt(self) -> bool:
return get_bool_env("ESPHOME_DASHBOARD_USE_MQTT")
@property
def using_ha_addon_auth(self) -> bool:
if not self.on_ha_addon:
return False
return not get_bool_env("DISABLE_HA_AUTHENTICATION")
@property
def using_auth(self) -> bool:
return self.using_password or self.using_ha_addon_auth
@property
def streamer_mode(self) -> bool:
return get_bool_env("ESPHOME_STREAMER_MODE")
def check_password(self, username: str, password: str) -> bool:
if not self.using_auth:
return True
# Compare in constant running time (to prevent timing attacks)
username_matches = hmac.compare_digest(
username.encode("utf-8"), self.username.encode("utf-8")
)
password_matches = hmac.compare_digest(
self.password_hash, password_hash(password)
)
return username_matches and password_matches
def rel_path(self, *args: Any) -> Path:
"""Return a path relative to the ESPHome config folder."""
joined_path = self.config_dir / Path(*args)
# Raises ValueError if not relative to ESPHome config folder
joined_path.resolve().relative_to(self.absolute_config_dir)
return joined_path

View File

@@ -1,170 +0,0 @@
from __future__ import annotations
import asyncio
import logging
import typing
from zeroconf import AddressResolver, IPVersion
from esphome.address_cache import normalize_hostname
from esphome.zeroconf import (
ESPHOME_SERVICE_TYPE,
AsyncEsphomeZeroconf,
DashboardBrowser,
DashboardImportDiscovery,
DashboardStatus,
DiscoveredImport,
)
from ..const import SENTINEL, DashboardEvent
from ..entries import DashboardEntry, EntryStateSource, bool_to_entry_state
from ..models import build_importable_device_dict
if typing.TYPE_CHECKING:
from ..core import ESPHomeDashboard
_LOGGER = logging.getLogger(__name__)
class MDNSStatus:
"""Class that updates the mdns status."""
def __init__(self, dashboard: ESPHomeDashboard) -> None:
"""Initialize the MDNSStatus class."""
super().__init__()
self.aiozc: AsyncEsphomeZeroconf | None = None
# This is the current mdns state for each host (True, False, None)
self.host_mdns_state: dict[str, bool | None] = {}
self._loop = asyncio.get_running_loop()
self.dashboard = dashboard
def async_setup(self) -> bool:
"""Set up the MDNSStatus class."""
try:
self.aiozc = AsyncEsphomeZeroconf()
except OSError as e:
_LOGGER.warning(
"Failed to initialize zeroconf, will fallback to ping: %s", e
)
return False
return True
async def async_resolve_host(self, host_name: str) -> list[str] | None:
"""Resolve a host name to an address in a thread-safe manner."""
if aiozc := self.aiozc:
return await aiozc.async_resolve_host(host_name)
return None
def get_cached_addresses(self, host_name: str) -> list[str] | None:
"""Get cached addresses for a host without triggering resolution.
Returns None if not in cache or no zeroconf available.
"""
if not self.aiozc:
_LOGGER.debug("No zeroconf instance available for %s", host_name)
return None
# Normalize hostname and get the base name
normalized = normalize_hostname(host_name)
base_name = normalized.partition(".")[0]
# Try to load from zeroconf cache without triggering resolution
resolver_name = f"{base_name}.local."
info = AddressResolver(resolver_name)
# Let zeroconf use its own current time for cache checking
if info.load_from_cache(self.aiozc.zeroconf):
addresses = info.parsed_scoped_addresses(IPVersion.All)
_LOGGER.debug("Found %s in zeroconf cache: %s", resolver_name, addresses)
return addresses
_LOGGER.debug("Not found in zeroconf cache: %s", resolver_name)
return None
def _on_import_update(self, name: str, discovered: DiscoveredImport | None) -> None:
"""Handle importable device updates."""
if discovered is None:
# Device removed
self.dashboard.bus.async_fire(
DashboardEvent.IMPORTABLE_DEVICE_REMOVED, {"name": name}
)
else:
# Device added
self.dashboard.bus.async_fire(
DashboardEvent.IMPORTABLE_DEVICE_ADDED,
{"device": build_importable_device_dict(self.dashboard, discovered)},
)
async def async_refresh_hosts(self) -> None:
"""Refresh the hosts to track."""
dashboard = self.dashboard
host_mdns_state = self.host_mdns_state
entries = dashboard.entries
poll_names: dict[str, set[DashboardEntry]] = {}
for entry in entries.async_all():
if entry.no_mdns:
continue
# If we just adopted/imported this host, we likely
# already have a state for it, so we should make sure
# to set it so the dashboard shows it as online
if entry.loaded_integrations and "api" not in entry.loaded_integrations:
# No api available so we have to poll since
# the device won't respond to a request to ._esphomelib._tcp.local.
poll_names.setdefault(entry.name, set()).add(entry)
elif (online := host_mdns_state.get(entry.name, SENTINEL)) != SENTINEL:
self._async_set_state(entry, online)
if poll_names and self.aiozc:
results = await asyncio.gather(
*(self.aiozc.async_resolve_host(name) for name in poll_names)
)
for name, address_list in zip(poll_names, results, strict=True):
result = bool(address_list)
host_mdns_state[name] = result
for entry in poll_names[name]:
self._async_set_state(entry, result)
def _async_set_state(self, entry: DashboardEntry, result: bool | None) -> None:
"""Set the state of an entry."""
state = bool_to_entry_state(result, EntryStateSource.MDNS)
if result:
# If we can reach it via mDNS, we always set it online
# since its the fastest source if its working
self.dashboard.entries.async_set_state(entry, state)
else:
# However if we can't reach it via mDNS
# we only set it to offline if the state is unknown
# or from mDNS
self.dashboard.entries.async_set_state_if_source(entry, state)
async def async_run(self) -> None:
"""Run the mdns status."""
dashboard = self.dashboard
entries = dashboard.entries
host_mdns_state = self.host_mdns_state
def on_update(dat: dict[str, bool | None]) -> None:
"""Update the entry state."""
for name, result in dat.items():
host_mdns_state[name] = result
if matching_entries := entries.get_by_name(name):
for entry in matching_entries:
self._async_set_state(entry, result)
stat = DashboardStatus(on_update)
imports = DashboardImportDiscovery(self._on_import_update)
dashboard.import_result = imports.import_state
browser = DashboardBrowser(
self.aiozc.zeroconf,
ESPHOME_SERVICE_TYPE,
[stat.browser_callback, imports.browser_callback],
)
ping_request = dashboard.ping_request
while not dashboard.stop_event.is_set():
await self.async_refresh_hosts()
await ping_request.wait()
ping_request.clear()
await browser.async_cancel()
await self.aiozc.async_close()
self.aiozc = None

View File

@@ -1,78 +0,0 @@
from __future__ import annotations
import binascii
import json
import os
import threading
import typing
from esphome import mqtt
from ..entries import EntryStateSource, bool_to_entry_state
if typing.TYPE_CHECKING:
from ..core import ESPHomeDashboard
class MqttStatusThread(threading.Thread):
"""Status thread to get the status of the devices via MQTT."""
def __init__(self, dashboard: ESPHomeDashboard) -> None:
"""Initialize the status thread."""
super().__init__()
self.dashboard = dashboard
def run(self) -> None:
"""Run the status thread."""
dashboard = self.dashboard
entries = dashboard.entries
current_entries = entries.all()
config = mqtt.config_from_env()
topic = "esphome/discover/#"
def on_message(client, userdata, msg):
payload = msg.payload.decode(errors="backslashreplace")
if len(payload) > 0:
data = json.loads(payload)
if "name" not in data:
return
if matching_entries := entries.get_by_name(data["name"]):
for entry in matching_entries:
# Only override state if we don't have a state from another source
# or we have a state from MQTT and the device is reachable
entries.set_state_if_online_or_source(
entry, bool_to_entry_state(True, EntryStateSource.MQTT)
)
def on_connect(client, userdata, flags, return_code):
client.publish("esphome/discover", None, retain=False)
mqttid = str(binascii.hexlify(os.urandom(6)).decode())
client = mqtt.prepare(
config,
[topic],
on_message,
on_connect,
None,
None,
f"esphome-dashboard-{mqttid}",
)
client.loop_start()
while not dashboard.stop_event.wait(2):
current_entries = entries.all()
# will be set to true on on_message
for entry in current_entries:
# Only override state if we don't have a state from another source
entries.set_state_if_source(
entry, bool_to_entry_state(False, EntryStateSource.MQTT)
)
client.publish("esphome/discover", None, retain=False)
dashboard.mqtt_ping_request.wait()
dashboard.mqtt_ping_request.clear()
client.disconnect()
client.loop_stop()

View File

@@ -1,151 +0,0 @@
from __future__ import annotations
import asyncio
import logging
import time
import typing
from typing import cast
from icmplib import Host, SocketPermissionError, async_ping
from ..const import MAX_EXECUTOR_WORKERS
from ..entries import (
DashboardEntry,
EntryState,
EntryStateSource,
ReachableState,
bool_to_entry_state,
)
from ..util.itertools import chunked
if typing.TYPE_CHECKING:
from ..core import ESPHomeDashboard
_LOGGER = logging.getLogger(__name__)
GROUP_SIZE = int(MAX_EXECUTOR_WORKERS / 2)
DNS_FAILURE_STATE = EntryState(ReachableState.DNS_FAILURE, EntryStateSource.PING)
MIN_PING_INTERVAL = 5 # ensure we don't ping too often
class PingStatus:
def __init__(self, dashboard: ESPHomeDashboard) -> None:
"""Initialize the PingStatus class."""
super().__init__()
self._loop = asyncio.get_running_loop()
self.dashboard = dashboard
async def async_run(self) -> None:
"""Run the ping status."""
dashboard = self.dashboard
entries = dashboard.entries
privileged = await _can_use_icmp_lib_with_privilege()
if privileged is None:
_LOGGER.warning("Cannot use icmplib because privileges are insufficient")
return
while not dashboard.stop_event.is_set():
# Only ping if the dashboard is open
await dashboard.ping_request.wait()
dashboard.ping_request.clear()
iteration_start = time.monotonic()
current_entries = dashboard.entries.async_all()
to_ping: list[DashboardEntry] = []
for entry in current_entries:
if entry.address is None:
# No address or we already have a state from another source
# so no need to ping
continue
if (
entry.state.reachable is ReachableState.ONLINE
and entry.state.source
not in (EntryStateSource.PING, EntryStateSource.UNKNOWN)
):
# If we already have a state from another source and
# it's online, we don't need to ping
continue
to_ping.append(entry)
# Resolve DNS for all entries
entries_with_addresses: dict[DashboardEntry, list[str]] = {}
for ping_group in chunked(to_ping, GROUP_SIZE):
ping_group = cast(list[DashboardEntry], ping_group)
now_monotonic = time.monotonic()
dns_results = await asyncio.gather(
*(
dashboard.dns_cache.async_resolve(entry.address, now_monotonic)
for entry in ping_group
),
return_exceptions=True,
)
for entry, result in zip(ping_group, dns_results, strict=True):
if isinstance(result, Exception):
# Only update state if its unknown or from ping
# so we don't mark it as offline if we have a state
# from mDNS or MQTT
entries.async_set_state_if_source(entry, DNS_FAILURE_STATE)
continue
if isinstance(result, BaseException):
raise result
entries_with_addresses[entry] = result
# Ping all entries with valid addresses
for ping_group in chunked(entries_with_addresses.items(), GROUP_SIZE):
entry_addresses = cast(tuple[DashboardEntry, list[str]], ping_group)
results = await asyncio.gather(
*(
async_ping(addresses[0], privileged=privileged)
for _, addresses in entry_addresses
),
return_exceptions=True,
)
for entry_address, result in zip(entry_addresses, results, strict=True):
if isinstance(result, Exception):
ping_result = False
elif isinstance(result, BaseException):
raise result
else:
host: Host = result
ping_result = host.is_alive
entry: DashboardEntry = entry_address[0]
# If we can reach it via ping, we always set it
# online, however if we can't reach it via ping
# we only set it to offline if the state is unknown
# or from ping
entries.async_set_state_if_online_or_source(
entry,
bool_to_entry_state(ping_result, EntryStateSource.PING),
)
if not dashboard.stop_event.is_set():
iteration_duration = time.monotonic() - iteration_start
if iteration_duration < MIN_PING_INTERVAL:
await asyncio.sleep(MIN_PING_INTERVAL - iteration_duration)
async def _can_use_icmp_lib_with_privilege() -> None | bool:
"""Verify we can create a raw socket."""
try:
await async_ping("127.0.0.1", count=0, timeout=0, privileged=True)
except SocketPermissionError:
try:
await async_ping("127.0.0.1", count=0, timeout=0, privileged=False)
except SocketPermissionError:
_LOGGER.debug(
"Cannot use icmplib because privileges are insufficient to create the"
" socket"
)
return None
_LOGGER.debug("Using icmplib in privileged=False mode")
return False
_LOGGER.debug("Using icmplib in privileged=True mode")
return True

View File

@@ -1,22 +0,0 @@
from __future__ import annotations
from collections.abc import Iterable
from functools import partial
from itertools import islice
from typing import Any
def take(take_num: int, iterable: Iterable) -> list[Any]:
"""Return first n items of the iterable as a list.
From itertools recipes
"""
return list(islice(iterable, take_num))
def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
"""Break *iterable* into lists of length *n*.
From more-itertools
"""
return iter(partial(take, chunked_num, iter(iterable)), [])

View File

@@ -1,11 +0,0 @@
from __future__ import annotations
import hashlib
def password_hash(password: str) -> bytes:
"""Create a hash of a password to transform it to a fixed-length digest.
Note this is not meant for secure storage, but for securely comparing passwords.
"""
return hashlib.sha256(password.encode()).digest()

View File

@@ -1,31 +0,0 @@
from __future__ import annotations
import asyncio
from collections.abc import Iterable
async def async_system_command_status(command: Iterable[str]) -> bool:
"""Run a system command checking only the status."""
process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
close_fds=False,
)
await process.wait()
return process.returncode == 0
async def async_run_system_command(command: Iterable[str]) -> tuple[bool, bytes, bytes]:
"""Run a system command and return a tuple of returncode, stdout, stderr."""
process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=False,
)
stdout, stderr = await process.communicate()
await process.wait()
return process.returncode, stdout, stderr

View File

@@ -1,15 +0,0 @@
"""Back-compat shim for ``friendly_name_slugify``.
The function moved to :mod:`esphome.helpers` so it survives the legacy
dashboard's eventual removal — see the
``esphome.helpers.friendly_name_slugify`` docstring. This module
re-exports the name so existing
``from esphome.dashboard.util.text import friendly_name_slugify``
imports keep working while downstream consumers migrate.
"""
from __future__ import annotations
from esphome.helpers import friendly_name_slugify
__all__ = ["friendly_name_slugify"]

File diff suppressed because it is too large Load Diff

View File

@@ -124,14 +124,8 @@ def slugify(value: str) -> str:
def friendly_name_slugify(value: str) -> str: def friendly_name_slugify(value: str) -> str:
"""Convert a friendly name to a slug with dashes instead of underscores. """Convert a friendly name to a slug with dashes instead of underscores.
Used by: Used by device-builder (esphome/device-builder), which slugifies friendly
- esphome.dashboard.web_server (legacy dashboard) names into the YAML filename / device name during adoption + wizard flows.
- device-builder (esphome/device-builder) — slugifies friendly names
into the YAML filename / device name during adoption + wizard flows.
Lives here rather than in ``esphome.dashboard.util.text`` so it
survives the legacy dashboard's eventual removal.
The dashboard module re-exports this name as a back-compat shim.
Coordinate with the device-builder team before changing the Coordinate with the device-builder team before changing the
slugification rules — the mapping must stay stable so existing slugification rules — the mapping must stay stable so existing
on-disk filenames keep matching across releases. on-disk filenames keep matching across releases.

View File

@@ -71,13 +71,9 @@ def _to_path_if_not_none(value: str | None) -> Path | None:
class StorageJSON: class StorageJSON:
"""Persisted device metadata sidecar. """Persisted device metadata sidecar.
Used by: Used by device-builder (esphome/device-builder), which reads/writes this
- esphome.dashboard (legacy dashboard) JSON file. The schema (``storage_version``, field names, types) must stay
- device-builder (esphome/device-builder) — reads/writes the same backwards compatible — coordinate with the device-builder team before
JSON file as the legacy dashboard so a single config_dir can be
shared between the two during the transition. The schema
(``storage_version``, field names, types) must stay backwards
compatible — coordinate with the device-builder team before
adding required fields or changing semantics of existing ones. adding required fields or changing semantics of existing ones.
""" """

View File

@@ -62,14 +62,12 @@ TXT_RECORD_VERSION = b"version"
class DiscoveredImport: class DiscoveredImport:
"""An importable device discovered via mDNS ``_esphomelib._tcp.local.``. """An importable device discovered via mDNS ``_esphomelib._tcp.local.``.
Used by: Used by device-builder (esphome/device-builder), which surfaces these as
- esphome.dashboard (legacy dashboard) "discovered devices" on its adoption flow.
- device-builder (esphome/device-builder) — surfaces these as
"discovered devices" on the new dashboard's adoption flow.
Fields are populated from TXT records on the broadcast service Fields are populated from TXT records on the broadcast service
info (see :class:`DashboardImportDiscovery`). Coordinate before info (see :class:`DashboardImportDiscovery`). Coordinate before
adding/removing fields — both consumers persist them. adding/removing fields — the consumer persists them.
""" """
friendly_name: str | None friendly_name: str | None
@@ -87,10 +85,8 @@ class DashboardBrowser(AsyncServiceBrowser):
class DashboardImportDiscovery: class DashboardImportDiscovery:
"""Track importable devices announcing on ``_esphomelib._tcp.local.``. """Track importable devices announcing on ``_esphomelib._tcp.local.``.
Used by: Used by device-builder (esphome/device-builder), which wires it up
- esphome.dashboard (legacy dashboard) alongside its own ``ServiceBrowser`` to populate the
- device-builder (esphome/device-builder) — wired up alongside
the dashboard's own ``ServiceBrowser`` to populate the
"Discovered devices" panel and the adoption flow. "Discovered devices" panel and the adoption flow.
The class maintains ``import_state: dict[str, DiscoveredImport]`` The class maintains ``import_state: dict[str, DiscoveredImport]``
@@ -262,9 +258,7 @@ async def async_resolve_hosts(
class AsyncEsphomeZeroconf(AsyncZeroconf): class AsyncEsphomeZeroconf(AsyncZeroconf):
"""ESPHome-tuned ``AsyncZeroconf`` with a hostname-resolve helper. """ESPHome-tuned ``AsyncZeroconf`` with a hostname-resolve helper.
Used by: Used by device-builder (esphome/device-builder), which drives both the live
- esphome.dashboard (legacy dashboard)
- device-builder (esphome/device-builder) — drives both the live
mDNS browser and the per-sweep ``async_resolve_host`` fallback mDNS browser and the per-sweep ``async_resolve_host`` fallback
for non-API devices that don't broadcast esphomelib. for non-API devices that don't broadcast esphomelib.

View File

@@ -3,15 +3,12 @@ voluptuous==0.16.0
PyYAML==6.0.3 PyYAML==6.0.3
paho-mqtt==1.6.1 paho-mqtt==1.6.1
colorama==0.4.6 colorama==0.4.6
icmplib==3.0.4
tornado==6.5.7
tzlocal==5.4.3 # from time tzlocal==5.4.3 # from time
tzdata>=2026.2 # from time tzdata>=2026.2 # from time
pyserial==3.5 pyserial==3.5
platformio==6.1.19 platformio==6.1.19
esptool==5.3.0 esptool==5.3.0
click==8.3.3 click==8.3.3
esphome-dashboard==20260425.0
aioesphomeapi==45.3.1 aioesphomeapi==45.3.1
zeroconf==0.149.16 zeroconf==0.149.16
puremagic==1.30 puremagic==1.30

View File

@@ -259,14 +259,7 @@ def lint_executable_bit(fname: Path) -> str | None:
return None return None
@lint_content_find_check( @lint_content_find_check("\t", only_first=True)
"\t",
only_first=True,
exclude=[
"esphome/dashboard/static/ace.js",
"esphome/dashboard/static/ext-searchbox.js",
],
)
def lint_tabs(fname, line, col, content): def lint_tabs(fname, line, col, content):
return "File contains tab character. Please convert tabs to spaces." return "File contains tab character. Please convert tabs to spaces."

View File

@@ -1,6 +0,0 @@
import pathlib
def get_fixture_path(filename: str) -> pathlib.Path:
"""Get path of fixture."""
return pathlib.Path(__file__).parent.joinpath("fixtures", filename)

View File

@@ -1,43 +0,0 @@
"""Common fixtures for dashboard tests."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, Mock
import pytest
import pytest_asyncio
from esphome.dashboard.core import ESPHomeDashboard
from esphome.dashboard.entries import DashboardEntries
@pytest.fixture
def mock_settings(tmp_path: Path) -> MagicMock:
"""Create mock dashboard settings."""
settings = MagicMock()
settings.config_dir = str(tmp_path)
settings.absolute_config_dir = tmp_path
return settings
@pytest.fixture
def mock_dashboard(mock_settings: MagicMock) -> Mock:
"""Create a mock dashboard."""
dashboard = Mock(spec=ESPHomeDashboard)
dashboard.settings = mock_settings
dashboard.entries = Mock()
dashboard.entries.async_all.return_value = []
dashboard.stop_event = Mock()
dashboard.stop_event.is_set.return_value = True
dashboard.ping_request = Mock()
dashboard.ignored_devices = set()
dashboard.bus = Mock()
dashboard.bus.async_fire = Mock()
return dashboard
@pytest_asyncio.fixture
async def dashboard_entries(mock_dashboard: Mock) -> DashboardEntries:
"""Create a DashboardEntries instance for testing."""
return DashboardEntries(mock_dashboard)

View File

@@ -1,47 +0,0 @@
substitutions:
name: picoproxy
friendly_name: Pico Proxy
esphome:
name: ${name}
friendly_name: ${friendly_name}
project:
name: esphome.bluetooth-proxy
version: "1.0"
esp32:
board: esp32dev
framework:
type: esp-idf
wifi:
ap:
api:
logger:
ota:
improv_serial:
dashboard_import:
package_import_url: github://esphome/firmware/bluetooth-proxy/esp32-generic.yaml@main
button:
- platform: factory_reset
id: resetf
- platform: safe_mode
name: Safe Mode Boot
entity_category: diagnostic
sensor:
- platform: template
id: pm11
name: "pm 1.0µm"
lambda: return 1.0;
- platform: template
id: pm251
name: "pm 2.5µm"
lambda: return 2.5;
- platform: template
id: pm101
name: "pm 10µm"
lambda: return 10;

View File

@@ -1,199 +0,0 @@
"""Unit tests for esphome.dashboard.dns module."""
from __future__ import annotations
import time
from unittest.mock import AsyncMock, patch
from icmplib import NameLookupError
import pytest
from esphome.dashboard.dns import DNSCache, _async_resolve_wrapper
@pytest.fixture
def dns_cache_fixture() -> DNSCache:
"""Create a DNSCache instance."""
return DNSCache()
def test_get_cached_addresses_not_in_cache(dns_cache_fixture: DNSCache) -> None:
"""Test get_cached_addresses when hostname is not in cache."""
now = time.monotonic()
result = dns_cache_fixture.get_cached_addresses("unknown.example.com", now)
assert result is None
def test_get_cached_addresses_expired(dns_cache_fixture: DNSCache) -> None:
"""Test get_cached_addresses when cache entry is expired."""
now = time.monotonic()
# Add entry that's already expired
dns_cache_fixture._cache["example.com"] = (now - 1, ["192.168.1.10"])
result = dns_cache_fixture.get_cached_addresses("example.com", now)
assert result is None
# Expired entry should still be in cache (not removed by get_cached_addresses)
assert "example.com" in dns_cache_fixture._cache
def test_get_cached_addresses_valid(dns_cache_fixture: DNSCache) -> None:
"""Test get_cached_addresses with valid cache entry."""
now = time.monotonic()
# Add entry that expires in 60 seconds
dns_cache_fixture._cache["example.com"] = (
now + 60,
["192.168.1.10", "192.168.1.11"],
)
result = dns_cache_fixture.get_cached_addresses("example.com", now)
assert result == ["192.168.1.10", "192.168.1.11"]
# Entry should still be in cache
assert "example.com" in dns_cache_fixture._cache
def test_get_cached_addresses_hostname_normalization(
dns_cache_fixture: DNSCache,
) -> None:
"""Test get_cached_addresses normalizes hostname."""
now = time.monotonic()
# Add entry with lowercase hostname
dns_cache_fixture._cache["example.com"] = (now + 60, ["192.168.1.10"])
# Test with various forms
assert dns_cache_fixture.get_cached_addresses("EXAMPLE.COM", now) == [
"192.168.1.10"
]
assert dns_cache_fixture.get_cached_addresses("example.com.", now) == [
"192.168.1.10"
]
assert dns_cache_fixture.get_cached_addresses("EXAMPLE.COM.", now) == [
"192.168.1.10"
]
def test_get_cached_addresses_ipv6(dns_cache_fixture: DNSCache) -> None:
"""Test get_cached_addresses with IPv6 addresses."""
now = time.monotonic()
dns_cache_fixture._cache["example.com"] = (now + 60, ["2001:db8::1", "fe80::1"])
result = dns_cache_fixture.get_cached_addresses("example.com", now)
assert result == ["2001:db8::1", "fe80::1"]
def test_get_cached_addresses_empty_list(dns_cache_fixture: DNSCache) -> None:
"""Test get_cached_addresses with empty address list."""
now = time.monotonic()
dns_cache_fixture._cache["example.com"] = (now + 60, [])
result = dns_cache_fixture.get_cached_addresses("example.com", now)
assert result == []
def test_get_cached_addresses_exception_in_cache(dns_cache_fixture: DNSCache) -> None:
"""Test get_cached_addresses when cache contains an exception."""
now = time.monotonic()
# Store an exception (from failed resolution)
dns_cache_fixture._cache["example.com"] = (now + 60, OSError("Resolution failed"))
result = dns_cache_fixture.get_cached_addresses("example.com", now)
assert result is None # Should return None for exceptions
def test_async_resolve_not_called(dns_cache_fixture: DNSCache) -> None:
"""Test that get_cached_addresses never calls async_resolve."""
now = time.monotonic()
with patch.object(dns_cache_fixture, "async_resolve") as mock_resolve:
# Test non-cached
result = dns_cache_fixture.get_cached_addresses("uncached.com", now)
assert result is None
mock_resolve.assert_not_called()
# Test expired
dns_cache_fixture._cache["expired.com"] = (now - 1, ["192.168.1.10"])
result = dns_cache_fixture.get_cached_addresses("expired.com", now)
assert result is None
mock_resolve.assert_not_called()
# Test valid
dns_cache_fixture._cache["valid.com"] = (now + 60, ["192.168.1.10"])
result = dns_cache_fixture.get_cached_addresses("valid.com", now)
assert result == ["192.168.1.10"]
mock_resolve.assert_not_called()
@pytest.mark.asyncio
async def test_async_resolve_wrapper_ip_address() -> None:
"""Test _async_resolve_wrapper returns IP address directly."""
result = await _async_resolve_wrapper("192.168.1.10")
assert result == ["192.168.1.10"]
result = await _async_resolve_wrapper("2001:db8::1")
assert result == ["2001:db8::1"]
@pytest.mark.asyncio
async def test_async_resolve_wrapper_local_fallback_success() -> None:
"""Test _async_resolve_wrapper falls back to bare hostname for .local."""
mock_resolve = AsyncMock()
# First call (device.local) fails, second call (device) succeeds
mock_resolve.side_effect = [
NameLookupError("device.local"),
["192.168.1.50"],
]
with patch("esphome.dashboard.dns.async_resolve", mock_resolve):
result = await _async_resolve_wrapper("device.local")
assert result == ["192.168.1.50"]
assert mock_resolve.call_count == 2
mock_resolve.assert_any_call("device.local")
mock_resolve.assert_any_call("device")
@pytest.mark.asyncio
async def test_async_resolve_wrapper_local_fallback_both_fail() -> None:
"""Test _async_resolve_wrapper returns exception when both fail."""
mock_resolve = AsyncMock()
original_exception = NameLookupError("device.local")
mock_resolve.side_effect = [
original_exception,
NameLookupError("device"),
]
with patch("esphome.dashboard.dns.async_resolve", mock_resolve):
result = await _async_resolve_wrapper("device.local")
# Should return the original exception, not the fallback exception
assert result is original_exception
assert mock_resolve.call_count == 2
@pytest.mark.asyncio
async def test_async_resolve_wrapper_non_local_no_fallback() -> None:
"""Test _async_resolve_wrapper doesn't fallback for non-.local hostnames."""
mock_resolve = AsyncMock()
original_exception = NameLookupError("device.example.com")
mock_resolve.side_effect = original_exception
with patch("esphome.dashboard.dns.async_resolve", mock_resolve):
result = await _async_resolve_wrapper("device.example.com")
assert result is original_exception
# Should only try the original hostname, no fallback
assert mock_resolve.call_count == 1
mock_resolve.assert_called_once_with("device.example.com")
@pytest.mark.asyncio
async def test_async_resolve_wrapper_local_success_no_fallback() -> None:
"""Test _async_resolve_wrapper doesn't fallback when .local succeeds."""
mock_resolve = AsyncMock(return_value=["192.168.1.50"])
with patch("esphome.dashboard.dns.async_resolve", mock_resolve):
result = await _async_resolve_wrapper("device.local")
assert result == ["192.168.1.50"]
# Should only try once since it succeeded
assert mock_resolve.call_count == 1
mock_resolve.assert_called_once_with("device.local")

View File

@@ -1,240 +0,0 @@
"""Unit tests for esphome.dashboard.status.mdns module."""
from __future__ import annotations
from unittest.mock import Mock, patch
import pytest
import pytest_asyncio
from zeroconf import AddressResolver, IPVersion
from esphome.dashboard.const import DashboardEvent
from esphome.dashboard.status.mdns import MDNSStatus
from esphome.zeroconf import DiscoveredImport
@pytest_asyncio.fixture
async def mdns_status(mock_dashboard: Mock) -> MDNSStatus:
"""Create an MDNSStatus instance in async context."""
# We're in an async context so get_running_loop will work
return MDNSStatus(mock_dashboard)
@pytest.mark.asyncio
async def test_get_cached_addresses_no_zeroconf(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses when no zeroconf instance is available."""
mdns_status.aiozc = None
result = mdns_status.get_cached_addresses("device.local")
assert result is None
@pytest.mark.asyncio
async def test_get_cached_addresses_not_in_cache(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses when address is not in cache."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = False
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("device.local")
assert result is None
mock_info.load_from_cache.assert_called_once_with(mdns_status.aiozc.zeroconf)
@pytest.mark.asyncio
async def test_get_cached_addresses_found_in_cache(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses when address is found in cache."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = True
mock_info.parsed_scoped_addresses.return_value = ["192.168.1.10", "fe80::1"]
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("device.local")
assert result == ["192.168.1.10", "fe80::1"]
mock_info.load_from_cache.assert_called_once_with(mdns_status.aiozc.zeroconf)
mock_info.parsed_scoped_addresses.assert_called_once_with(IPVersion.All)
@pytest.mark.asyncio
async def test_get_cached_addresses_with_trailing_dot(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses with hostname having trailing dot."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = True
mock_info.parsed_scoped_addresses.return_value = ["192.168.1.10"]
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("device.local.")
assert result == ["192.168.1.10"]
# Should normalize to device.local. for zeroconf
mock_resolver.assert_called_once_with("device.local.")
@pytest.mark.asyncio
async def test_get_cached_addresses_uppercase_hostname(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses with uppercase hostname."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = True
mock_info.parsed_scoped_addresses.return_value = ["192.168.1.10"]
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("DEVICE.LOCAL")
assert result == ["192.168.1.10"]
# Should normalize to device.local. for zeroconf
mock_resolver.assert_called_once_with("device.local.")
@pytest.mark.asyncio
async def test_get_cached_addresses_simple_hostname(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses with simple hostname (no domain)."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = True
mock_info.parsed_scoped_addresses.return_value = ["192.168.1.10"]
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("device")
assert result == ["192.168.1.10"]
# Should append .local. for zeroconf
mock_resolver.assert_called_once_with("device.local.")
@pytest.mark.asyncio
async def test_get_cached_addresses_ipv6_only(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses returning only IPv6 addresses."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = True
mock_info.parsed_scoped_addresses.return_value = ["fe80::1", "2001:db8::1"]
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("device.local")
assert result == ["fe80::1", "2001:db8::1"]
@pytest.mark.asyncio
async def test_get_cached_addresses_empty_list(mdns_status: MDNSStatus) -> None:
"""Test get_cached_addresses returning empty list from cache."""
mdns_status.aiozc = Mock()
mdns_status.aiozc.zeroconf = Mock()
with patch("esphome.dashboard.status.mdns.AddressResolver") as mock_resolver:
mock_info = Mock(spec=AddressResolver)
mock_info.load_from_cache.return_value = True
mock_info.parsed_scoped_addresses.return_value = []
mock_resolver.return_value = mock_info
result = mdns_status.get_cached_addresses("device.local")
assert result == []
@pytest.mark.asyncio
async def test_async_setup_success(mock_dashboard: Mock) -> None:
"""Test successful async_setup."""
mdns_status = MDNSStatus(mock_dashboard)
with patch("esphome.dashboard.status.mdns.AsyncEsphomeZeroconf") as mock_zc:
mock_zc.return_value = Mock()
result = mdns_status.async_setup()
assert result is True
assert mdns_status.aiozc is not None
@pytest.mark.asyncio
async def test_async_setup_failure(mock_dashboard: Mock) -> None:
"""Test async_setup with OSError."""
mdns_status = MDNSStatus(mock_dashboard)
with patch("esphome.dashboard.status.mdns.AsyncEsphomeZeroconf") as mock_zc:
mock_zc.side_effect = OSError("Network error")
result = mdns_status.async_setup()
assert result is False
assert mdns_status.aiozc is None
@pytest.mark.asyncio
async def test_on_import_update_device_added(mdns_status: MDNSStatus) -> None:
"""Test _on_import_update when a device is added."""
# Create a DiscoveredImport object
discovered = DiscoveredImport(
device_name="test_device",
friendly_name="Test Device",
package_import_url="https://example.com/package",
project_name="test_project",
project_version="1.0.0",
network="wifi",
)
# Call _on_import_update with a device
mdns_status._on_import_update("test_device", discovered)
# Should fire IMPORTABLE_DEVICE_ADDED event
mock_dashboard = mdns_status.dashboard
mock_dashboard.bus.async_fire.assert_called_once()
call_args = mock_dashboard.bus.async_fire.call_args
assert call_args[0][0] == DashboardEvent.IMPORTABLE_DEVICE_ADDED
assert "device" in call_args[0][1]
device_data = call_args[0][1]["device"]
assert device_data["name"] == "test_device"
assert device_data["friendly_name"] == "Test Device"
assert device_data["project_name"] == "test_project"
assert device_data["ignored"] is False
@pytest.mark.asyncio
async def test_on_import_update_device_ignored(mdns_status: MDNSStatus) -> None:
"""Test _on_import_update when a device is ignored."""
# Add device to ignored list
mdns_status.dashboard.ignored_devices.add("ignored_device")
# Create a DiscoveredImport object for ignored device
discovered = DiscoveredImport(
device_name="ignored_device",
friendly_name="Ignored Device",
package_import_url="https://example.com/package",
project_name="test_project",
project_version="1.0.0",
network="ethernet",
)
# Call _on_import_update with an ignored device
mdns_status._on_import_update("ignored_device", discovered)
# Should fire IMPORTABLE_DEVICE_ADDED event with ignored=True
mock_dashboard = mdns_status.dashboard
mock_dashboard.bus.async_fire.assert_called_once()
call_args = mock_dashboard.bus.async_fire.call_args
assert call_args[0][0] == DashboardEvent.IMPORTABLE_DEVICE_ADDED
device_data = call_args[0][1]["device"]
assert device_data["name"] == "ignored_device"
assert device_data["ignored"] is True
@pytest.mark.asyncio
async def test_on_import_update_device_removed(mdns_status: MDNSStatus) -> None:
"""Test _on_import_update when a device is removed."""
# Call _on_import_update with None (device removed)
mdns_status._on_import_update("removed_device", None)
# Should fire IMPORTABLE_DEVICE_REMOVED event
mdns_status.dashboard.bus.async_fire.assert_called_once_with(
DashboardEvent.IMPORTABLE_DEVICE_REMOVED, {"name": "removed_device"}
)

View File

@@ -1,288 +0,0 @@
"""Tests for dashboard entries Path-related functionality."""
from __future__ import annotations
import os
from pathlib import Path
import tempfile
from unittest.mock import Mock
import pytest
from esphome.core import CORE
from esphome.dashboard.const import DashboardEvent
from esphome.dashboard.entries import DashboardEntries, DashboardEntry
def create_cache_key() -> tuple[int, int, float, int]:
"""Helper to create a valid DashboardCacheKeyType."""
return (0, 0, 0.0, 0)
@pytest.fixture(autouse=True)
def setup_core():
"""Set up CORE for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
CORE.config_path = Path(tmpdir) / "test.yaml"
yield
CORE.reset()
def test_dashboard_entry_path_initialization() -> None:
"""Test DashboardEntry initializes with path correctly."""
test_path = Path("/test/config/device.yaml")
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
assert entry.cache_key == cache_key
def test_dashboard_entry_path_with_absolute_path() -> None:
"""Test DashboardEntry handles absolute paths."""
# Use a truly absolute path for the platform
test_path = Path.cwd() / "absolute" / "path" / "to" / "config.yaml"
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
assert entry.path.is_absolute()
def test_dashboard_entry_path_with_relative_path() -> None:
"""Test DashboardEntry handles relative paths."""
test_path = Path("configs/device.yaml")
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
assert not entry.path.is_absolute()
@pytest.mark.asyncio
async def test_dashboard_entries_get_by_path(
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test getting entry by path."""
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
# Update entries to load the file
await dashboard_entries.async_update_entries()
# Verify the entry was loaded
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
entry = all_entries[0]
assert entry.path == test_file
# Also verify get() works with Path
result = dashboard_entries.get(test_file)
assert result == entry
@pytest.mark.asyncio
async def test_dashboard_entries_get_nonexistent_path(
dashboard_entries: DashboardEntries,
) -> None:
"""Test getting non-existent entry returns None."""
result = dashboard_entries.get("/nonexistent/path.yaml")
assert result is None
@pytest.mark.asyncio
async def test_dashboard_entries_path_normalization(
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test that paths are handled consistently."""
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
# Update entries to load the file
await dashboard_entries.async_update_entries()
# Get the entry by path
result = dashboard_entries.get(test_file)
assert result is not None
@pytest.mark.asyncio
async def test_dashboard_entries_path_with_spaces(
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test handling paths with spaces."""
# Create a test file with spaces in name
test_file = tmp_path / "my device.yaml"
test_file.write_text("test config")
# Update entries to load the file
await dashboard_entries.async_update_entries()
# Get the entry by path
result = dashboard_entries.get(test_file)
assert result is not None
assert result.path == test_file
@pytest.mark.asyncio
async def test_dashboard_entries_path_with_special_chars(
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test handling paths with special characters."""
# Create a test file with special characters
test_file = tmp_path / "device-01_test.yaml"
test_file.write_text("test config")
# Update entries to load the file
await dashboard_entries.async_update_entries()
# Get the entry by path
result = dashboard_entries.get(test_file)
assert result is not None
def test_dashboard_entries_windows_path() -> None:
"""Test handling Windows-style paths."""
test_path = Path(r"C:\Users\test\esphome\device.yaml")
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
@pytest.mark.asyncio
async def test_dashboard_entries_path_to_cache_key_mapping(
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test internal entries storage with paths and cache keys."""
# Create test files
file1 = tmp_path / "device1.yaml"
file2 = tmp_path / "device2.yaml"
file1.write_text("test config 1")
file2.write_text("test config 2")
# Update entries to load the files
await dashboard_entries.async_update_entries()
# Get entries and verify they have different cache keys
entry1 = dashboard_entries.get(file1)
entry2 = dashboard_entries.get(file2)
assert entry1 is not None
assert entry2 is not None
assert entry1.cache_key != entry2.cache_key
def test_dashboard_entry_path_property() -> None:
"""Test that path property returns expected value."""
test_path = Path("/test/config/device.yaml")
entry = DashboardEntry(test_path, create_cache_key())
assert entry.path == test_path
assert isinstance(entry.path, Path)
@pytest.mark.asyncio
async def test_dashboard_entries_all_returns_entries_with_paths(
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test that all() returns entries with their paths intact."""
# Create test files
files = [
tmp_path / "device1.yaml",
tmp_path / "device2.yaml",
tmp_path / "device3.yaml",
]
for file in files:
file.write_text("test config")
# Update entries to load the files
await dashboard_entries.async_update_entries()
all_entries = dashboard_entries.async_all()
assert len(all_entries) == len(files)
retrieved_paths = [entry.path for entry in all_entries]
assert set(retrieved_paths) == set(files)
@pytest.mark.asyncio
async def test_async_update_entries_removed_path(
dashboard_entries: DashboardEntries, mock_dashboard: Mock, tmp_path: Path
) -> None:
"""Test that removed files trigger ENTRY_REMOVED event."""
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
# First update to add the entry
await dashboard_entries.async_update_entries()
# Verify entry was added
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
entry = all_entries[0]
# Delete the file
test_file.unlink()
# Second update to detect removal
await dashboard_entries.async_update_entries()
# Verify entry was removed
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 0
# Verify ENTRY_REMOVED event was fired
mock_dashboard.bus.async_fire.assert_any_call(
DashboardEvent.ENTRY_REMOVED, {"entry": entry}
)
@pytest.mark.asyncio
async def test_async_update_entries_updated_path(
dashboard_entries: DashboardEntries, mock_dashboard: Mock, tmp_path: Path
) -> None:
"""Test that modified files trigger ENTRY_UPDATED event."""
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
# First update to add the entry
await dashboard_entries.async_update_entries()
# Verify entry was added
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
entry = all_entries[0]
original_cache_key = entry.cache_key
# Modify the file to change its mtime
test_file.write_text("updated config")
# Explicitly change the mtime to ensure it's different
stat = test_file.stat()
os.utime(test_file, (stat.st_atime, stat.st_mtime + 1))
# Second update to detect modification
await dashboard_entries.async_update_entries()
# Verify entry is still there with updated cache key
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
updated_entry = all_entries[0]
assert updated_entry == entry # Same entry object
assert updated_entry.cache_key != original_cache_key # But cache key updated
# Verify ENTRY_UPDATED event was fired
mock_dashboard.bus.async_fire.assert_any_call(
DashboardEvent.ENTRY_UPDATED, {"entry": entry}
)

View File

@@ -1,287 +0,0 @@
"""Tests for DashboardSettings (path resolution and authentication)."""
from __future__ import annotations
from argparse import Namespace
from pathlib import Path
import tempfile
import pytest
from esphome.core import CORE
from esphome.dashboard.settings import DashboardSettings
from esphome.dashboard.util.password import password_hash
@pytest.fixture
def dashboard_settings(tmp_path: Path) -> DashboardSettings:
"""Create DashboardSettings instance with temp directory."""
settings = DashboardSettings()
# Resolve symlinks to ensure paths match
resolved_dir = tmp_path.resolve()
settings.config_dir = resolved_dir
settings.absolute_config_dir = resolved_dir
return settings
def test_rel_path_simple(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with simple relative path."""
result = dashboard_settings.rel_path("config.yaml")
expected = dashboard_settings.config_dir / "config.yaml"
assert result == expected
def test_rel_path_multiple_components(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with multiple path components."""
result = dashboard_settings.rel_path("subfolder", "device", "config.yaml")
expected = dashboard_settings.config_dir / "subfolder" / "device" / "config.yaml"
assert result == expected
def test_rel_path_with_dots(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path prevents directory traversal."""
# This should raise ValueError as it tries to go outside config_dir
with pytest.raises(ValueError):
dashboard_settings.rel_path("..", "outside.yaml")
def test_rel_path_absolute_path_within_config(
dashboard_settings: DashboardSettings,
) -> None:
"""Test rel_path with absolute path that's within config dir."""
internal_path = dashboard_settings.absolute_config_dir / "internal.yaml"
internal_path.touch()
result = dashboard_settings.rel_path("internal.yaml")
expected = dashboard_settings.config_dir / "internal.yaml"
assert result == expected
def test_rel_path_absolute_path_outside_config(
dashboard_settings: DashboardSettings,
) -> None:
"""Test rel_path with absolute path outside config dir raises error."""
outside_path = "/tmp/outside/config.yaml"
with pytest.raises(ValueError):
dashboard_settings.rel_path(outside_path)
def test_rel_path_empty_args(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with no arguments returns config_dir."""
result = dashboard_settings.rel_path()
assert result == dashboard_settings.config_dir
def test_rel_path_with_pathlib_path(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path works with Path objects as arguments."""
path_obj = Path("subfolder") / "config.yaml"
result = dashboard_settings.rel_path(path_obj)
expected = dashboard_settings.config_dir / "subfolder" / "config.yaml"
assert result == expected
def test_rel_path_normalizes_slashes(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path normalizes path separators."""
# os.path.join normalizes slashes on Windows but preserves them on Unix
# Test that providing components separately gives same result
result1 = dashboard_settings.rel_path("folder", "subfolder", "file.yaml")
result2 = dashboard_settings.rel_path("folder", "subfolder", "file.yaml")
assert result1 == result2
# Also test that the result is as expected
expected = dashboard_settings.config_dir / "folder" / "subfolder" / "file.yaml"
assert result1 == expected
def test_rel_path_handles_spaces(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles paths with spaces."""
result = dashboard_settings.rel_path("my folder", "my config.yaml")
expected = dashboard_settings.config_dir / "my folder" / "my config.yaml"
assert result == expected
def test_rel_path_handles_special_chars(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles paths with special characters."""
result = dashboard_settings.rel_path("device-01_test", "config.yaml")
expected = dashboard_settings.config_dir / "device-01_test" / "config.yaml"
assert result == expected
def test_config_dir_as_path_property(dashboard_settings: DashboardSettings) -> None:
"""Test that config_dir can be accessed and used with Path operations."""
config_path = dashboard_settings.config_dir
assert config_path.exists()
assert config_path.is_dir()
assert config_path.is_absolute()
def test_absolute_config_dir_property(dashboard_settings: DashboardSettings) -> None:
"""Test absolute_config_dir is a Path object."""
assert isinstance(dashboard_settings.absolute_config_dir, Path)
assert dashboard_settings.absolute_config_dir.exists()
assert dashboard_settings.absolute_config_dir.is_dir()
assert dashboard_settings.absolute_config_dir.is_absolute()
def test_rel_path_symlink_inside_config(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with symlink that points inside config dir."""
target = dashboard_settings.absolute_config_dir / "target.yaml"
target.touch()
symlink = dashboard_settings.absolute_config_dir / "link.yaml"
symlink.symlink_to(target)
result = dashboard_settings.rel_path("link.yaml")
expected = dashboard_settings.config_dir / "link.yaml"
assert result == expected
def test_rel_path_symlink_outside_config(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with symlink that points outside config dir."""
with tempfile.NamedTemporaryFile(suffix=".yaml") as tmp:
symlink = dashboard_settings.absolute_config_dir / "external_link.yaml"
symlink.symlink_to(tmp.name)
with pytest.raises(ValueError):
dashboard_settings.rel_path("external_link.yaml")
def test_rel_path_with_none_arg(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles None arguments gracefully."""
result = dashboard_settings.rel_path("None")
expected = dashboard_settings.config_dir / "None"
assert result == expected
def test_rel_path_with_numeric_args(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles numeric arguments."""
result = dashboard_settings.rel_path("123", "456.789")
expected = dashboard_settings.config_dir / "123" / "456.789"
assert result == expected
def test_config_path_parent_resolves_to_config_dir(tmp_path: Path) -> None:
"""Test that CORE.config_path.parent resolves to config_dir after parse_args.
This is a regression test for issue #11280 where binary download failed
when using packages with secrets after the Path migration in 2025.10.0.
The issue was that after switching from os.path to Path:
- Before: os.path.dirname("/config/.") → "/config"
- After: Path("/config/.").parent → Path("/") (normalized first!)
The fix uses a sentinel file so .parent returns the correct directory:
- Fixed: Path("/config/___DASHBOARD_SENTINEL___.yaml").parent → Path("/config")
"""
# Create test directory structure with secrets and packages
config_dir = tmp_path / "config"
config_dir.mkdir()
# Create secrets.yaml with obviously fake test values
secrets_file = config_dir / "secrets.yaml"
secrets_file.write_text(
"wifi_ssid: TEST-DUMMY-SSID\n"
"wifi_password: not-a-real-password-just-for-testing\n"
)
# Create package file that uses secrets
package_file = config_dir / "common.yaml"
package_file.write_text(
"wifi:\n ssid: !secret wifi_ssid\n password: !secret wifi_password\n"
)
# Create main device config that includes the package
device_config = config_dir / "test-device.yaml"
device_config.write_text(
"esphome:\n name: test-device\n\npackages:\n common: !include common.yaml\n"
)
# Set up dashboard settings with our test config directory
settings = DashboardSettings()
args = Namespace(
configuration=str(config_dir),
password=None,
username=None,
ha_addon=False,
verbose=False,
)
settings.parse_args(args)
# Verify that CORE.config_path.parent correctly points to the config directory
# This is critical for secret resolution in yaml_util.py which does:
# main_config_dir = CORE.config_path.parent
# main_secret_yml = main_config_dir / "secrets.yaml"
assert CORE.config_path.parent == config_dir.resolve()
assert (CORE.config_path.parent / "secrets.yaml").exists()
assert (CORE.config_path.parent / "common.yaml").exists()
# Verify that CORE.config_path itself uses the sentinel file
assert CORE.config_path.name == "___DASHBOARD_SENTINEL___.yaml"
assert not CORE.config_path.exists() # Sentinel file doesn't actually exist
@pytest.fixture
def auth_settings(dashboard_settings: DashboardSettings) -> DashboardSettings:
"""Create DashboardSettings with auth configured, based on dashboard_settings."""
dashboard_settings.username = "admin"
dashboard_settings.using_password = True
dashboard_settings.password_hash = password_hash("correctpassword")
return dashboard_settings
def test_check_password_correct_credentials(auth_settings: DashboardSettings) -> None:
"""Test check_password returns True for correct username and password."""
assert auth_settings.check_password("admin", "correctpassword") is True
def test_check_password_wrong_password(auth_settings: DashboardSettings) -> None:
"""Test check_password returns False for wrong password."""
assert auth_settings.check_password("admin", "wrongpassword") is False
def test_check_password_wrong_username(auth_settings: DashboardSettings) -> None:
"""Test check_password returns False for wrong username."""
assert auth_settings.check_password("notadmin", "correctpassword") is False
def test_check_password_both_wrong(auth_settings: DashboardSettings) -> None:
"""Test check_password returns False when both are wrong."""
assert auth_settings.check_password("notadmin", "wrongpassword") is False
def test_check_password_no_auth(dashboard_settings: DashboardSettings) -> None:
"""Test check_password returns True when auth is not configured."""
assert dashboard_settings.check_password("anyone", "anything") is True
def test_check_password_non_ascii_username(
dashboard_settings: DashboardSettings,
) -> None:
"""Test check_password handles non-ASCII usernames without TypeError."""
dashboard_settings.username = "\u00e9l\u00e8ve"
dashboard_settings.using_password = True
dashboard_settings.password_hash = password_hash("pass")
assert dashboard_settings.check_password("\u00e9l\u00e8ve", "pass") is True
assert dashboard_settings.check_password("\u00e9l\u00e8ve", "wrong") is False
assert dashboard_settings.check_password("other", "pass") is False
def test_check_password_ha_addon_no_password(
dashboard_settings: DashboardSettings,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test check_password doesn't crash in HA add-on mode without a password.
In HA add-on mode, using_ha_addon_auth can be True while using_password
is False, leaving password_hash as b"". This must not raise TypeError
in hmac.compare_digest.
"""
monkeypatch.delenv("DISABLE_HA_AUTHENTICATION", raising=False)
dashboard_settings.on_ha_addon = True
dashboard_settings.using_password = False
# password_hash stays as default b""
assert dashboard_settings.check_password("anyone", "anything") is False

File diff suppressed because it is too large Load Diff

View File

@@ -1,219 +0,0 @@
"""Tests for dashboard web_server Path-related functionality."""
from __future__ import annotations
import gzip
import os
from pathlib import Path
from unittest.mock import MagicMock, patch
from esphome.dashboard import web_server
def test_get_base_frontend_path_production() -> None:
"""Test get_base_frontend_path in production mode."""
mock_module = MagicMock()
mock_module.where.return_value = Path("/usr/local/lib/esphome_dashboard")
with (
patch.dict(os.environ, {}, clear=True),
patch.dict("sys.modules", {"esphome_dashboard": mock_module}),
):
result = web_server.get_base_frontend_path()
assert result == Path("/usr/local/lib/esphome_dashboard")
mock_module.where.assert_called_once()
def test_get_base_frontend_path_dev_mode() -> None:
"""Test get_base_frontend_path in development mode."""
test_path = "/home/user/esphome/dashboard"
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": test_path}):
result = web_server.get_base_frontend_path()
# The function uses Path.resolve() which resolves symlinks
# The actual function adds "/" to the path, so we simulate that
test_path_with_slash = test_path if test_path.endswith("/") else test_path + "/"
expected = (Path.cwd() / test_path_with_slash / "esphome_dashboard").resolve()
assert result == expected
def test_get_base_frontend_path_dev_mode_with_trailing_slash() -> None:
"""Test get_base_frontend_path in dev mode with trailing slash."""
test_path = "/home/user/esphome/dashboard/"
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": test_path}):
result = web_server.get_base_frontend_path()
# The function uses Path.resolve() which resolves symlinks
expected = (Path.cwd() / test_path / "esphome_dashboard").resolve()
assert result == expected
def test_get_base_frontend_path_dev_mode_relative_path() -> None:
"""Test get_base_frontend_path with relative dev path."""
test_path = "./dashboard"
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": test_path}):
result = web_server.get_base_frontend_path()
# The function uses Path.resolve() which resolves symlinks
# The actual function adds "/" to the path, so we simulate that
test_path_with_slash = test_path if test_path.endswith("/") else test_path + "/"
expected = (Path.cwd() / test_path_with_slash / "esphome_dashboard").resolve()
assert result == expected
assert result.is_absolute()
def test_get_static_path_single_component() -> None:
"""Test get_static_path with single path component."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/frontend")
result = web_server.get_static_path("file.js")
assert result == Path("/base/frontend") / "static" / "file.js"
def test_get_static_path_multiple_components() -> None:
"""Test get_static_path with multiple path components."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/frontend")
result = web_server.get_static_path("js", "esphome", "index.js")
assert (
result == Path("/base/frontend") / "static" / "js" / "esphome" / "index.js"
)
def test_get_static_path_empty_args() -> None:
"""Test get_static_path with no arguments."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/frontend")
result = web_server.get_static_path()
assert result == Path("/base/frontend") / "static"
def test_get_static_path_with_pathlib_path() -> None:
"""Test get_static_path with Path objects."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/frontend")
path_obj = Path("js") / "app.js"
result = web_server.get_static_path(str(path_obj))
assert result == Path("/base/frontend") / "static" / "js" / "app.js"
def test_get_static_file_url_production() -> None:
"""Test get_static_file_url in production mode."""
web_server.get_static_file_url.cache_clear()
mock_module = MagicMock()
mock_path = MagicMock(spec=Path)
mock_path.read_bytes.return_value = b"test content"
with (
patch.dict(os.environ, {}, clear=True),
patch.dict("sys.modules", {"esphome_dashboard": mock_module}),
patch("esphome.dashboard.web_server.get_static_path") as mock_get_path,
):
mock_get_path.return_value = mock_path
result = web_server.get_static_file_url("js/app.js")
assert result.startswith("./static/js/app.js?hash=")
def test_get_static_file_url_dev_mode() -> None:
"""Test get_static_file_url in development mode."""
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": "/dev/path"}):
web_server.get_static_file_url.cache_clear()
result = web_server.get_static_file_url("js/app.js")
assert result == "./static/js/app.js"
def test_get_static_file_url_index_js_special_case() -> None:
"""Test get_static_file_url replaces index.js with entrypoint."""
web_server.get_static_file_url.cache_clear()
mock_module = MagicMock()
mock_module.entrypoint.return_value = "main.js"
with (
patch.dict(os.environ, {}, clear=True),
patch.dict("sys.modules", {"esphome_dashboard": mock_module}),
):
result = web_server.get_static_file_url("js/esphome/index.js")
assert result == "./static/js/esphome/main.js"
def test_load_file_path(tmp_path: Path) -> None:
"""Test loading a file."""
test_file = tmp_path / "test.txt"
test_file.write_bytes(b"test content")
with test_file.open("rb") as f:
content = f.read()
assert content == b"test content"
def test_load_file_compressed_path(tmp_path: Path) -> None:
"""Test loading a compressed file."""
test_file = tmp_path / "test.txt.gz"
with gzip.open(test_file, "wb") as gz:
gz.write(b"compressed content")
with gzip.open(test_file, "rb") as gz:
content = gz.read()
assert content == b"compressed content"
def test_path_normalization_in_static_path() -> None:
"""Test that paths are normalized correctly."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/frontend")
# Test with separate components
result1 = web_server.get_static_path("js", "app.js")
result2 = web_server.get_static_path("js", "app.js")
assert result1 == result2
assert result1 == Path("/base/frontend") / "static" / "js" / "app.js"
def test_windows_path_handling() -> None:
"""Test handling of Windows-style paths."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path(r"C:\Program Files\esphome\frontend")
result = web_server.get_static_path("js", "app.js")
# Path should handle this correctly on the platform
expected = (
Path(r"C:\Program Files\esphome\frontend") / "static" / "js" / "app.js"
)
assert result == expected
def test_path_with_special_characters() -> None:
"""Test paths with special characters."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/frontend")
result = web_server.get_static_path("js-modules", "app_v1.0.js")
assert (
result == Path("/base/frontend") / "static" / "js-modules" / "app_v1.0.js"
)
def test_path_with_spaces() -> None:
"""Test paths with spaces."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = Path("/base/my frontend")
result = web_server.get_static_path("my js", "my app.js")
assert result == Path("/base/my frontend") / "static" / "my js" / "my app.js"

View File

@@ -562,7 +562,7 @@ def test_determine_integration_tests(
with patch.object( with patch.object(
determine_jobs, determine_jobs,
"changed_files", "changed_files",
return_value=["esphome/dashboard/web_server.py"], return_value=["esphome/analyze_memory/helpers.py"],
): ):
run_all, test_files = determine_jobs.determine_integration_tests() run_all, test_files = determine_jobs.determine_integration_tests()
assert run_all is False assert run_all is False
@@ -914,7 +914,6 @@ def test_should_run_core_ci_with_branch() -> None:
# picks them up because esphome's pyproject sets # picks them up because esphome's pyproject sets
# include-package-data = true. # include-package-data = true.
(["esphome/idf_component.yml"], True), (["esphome/idf_component.yml"], True),
(["esphome/dashboard/templates/index.html"], True),
(["esphome/components/api/api_pb2_service.json"], True), (["esphome/components/api/api_pb2_service.json"], True),
# Mixed: any triggering file is enough # Mixed: any triggering file is enough
(["docs/README.md", "esphome/config.py"], True), (["docs/README.md", "esphome/config.py"], True),

View File

@@ -121,22 +121,6 @@ def test_friendly_name_slugify(value, expected):
assert helpers.friendly_name_slugify(value) == expected assert helpers.friendly_name_slugify(value) == expected
def test_friendly_name_slugify_back_compat_shim():
"""``esphome.dashboard.util.text`` keeps re-exporting for back-compat.
The function moved to ``esphome.helpers`` so the new
device-builder dashboard backend can import it without depending
on the legacy dashboard package, but downstream code that still
imports from the old path keeps working until the dashboard
module is removed.
"""
from esphome.dashboard.util.text import (
friendly_name_slugify as legacy_friendly_name_slugify,
)
assert legacy_friendly_name_slugify is helpers.friendly_name_slugify
@pytest.mark.parametrize( @pytest.mark.parametrize(
"host", "host",
( (

View File

@@ -33,6 +33,7 @@ from esphome.__main__ import (
command_clean_all, command_clean_all,
command_config, command_config,
command_config_hash, command_config_hash,
command_dashboard,
command_idedata, command_idedata,
command_rename, command_rename,
command_run, command_run,
@@ -3740,6 +3741,45 @@ def test_command_wizard(tmp_path: Path) -> None:
mock_wizard.assert_called_once_with(config_file) mock_wizard.assert_called_once_with(config_file)
def test_command_dashboard_errors_with_device_builder_redirect() -> None:
"""The removed dashboard command points users to ESPHome Device Builder."""
args = MockArgs()
with pytest.raises(EsphomeError, match="esphome-device-builder"):
command_dashboard(args)
@pytest.mark.parametrize(
"argv",
[
["esphome", "dashboard"],
["esphome", "dashboard", "/config"],
# Legacy flags must be accepted so old invocations reach the redirect
# instead of failing on argparse "unrecognized arguments".
["esphome", "dashboard", "--port", "6052", "/config"],
["esphome", "dashboard", "--username", "u", "--password", "p", "--open-ui"],
[
"esphome",
"dashboard",
"--address",
"0.0.0.0",
"--socket",
"/x",
"--ha-addon",
],
],
)
def test_run_esphome_dashboard_redirects_to_device_builder(
argv: list[str],
caplog: pytest.LogCaptureFixture,
) -> None:
"""`esphome dashboard` still parses but fails with the redirect message."""
result = run_esphome(argv)
assert result == 1
assert "esphome-device-builder" in caplog.text
def test_command_config_hash( def test_command_config_hash(
tmp_path: Path, tmp_path: Path,
capfd: CaptureFixture[str], capfd: CaptureFixture[str],