From ae74920b814ad555a15002d12675d057b040e0e5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 26 May 2026 00:14:42 -0500 Subject: [PATCH] [core] Enable ruff PTH (flake8-use-pathlib) lint family (#16661) --- esphome/__main__.py | 4 +- esphome/analyze_memory/cli.py | 6 +-- esphome/analyze_memory/toolchain.py | 3 +- esphome/build_gen/espidf.py | 2 +- esphome/bundle.py | 2 +- esphome/components/audio_file/__init__.py | 2 +- esphome/components/bme68x_bsec2/__init__.py | 2 +- .../esp32_hosted/update/__init__.py | 4 +- esphome/components/http_request/__init__.py | 4 +- esphome/components/image/__init__.py | 2 +- .../components/micro_wake_word/__init__.py | 4 +- esphome/components/nrf52/ota.py | 2 +- esphome/components/rp2040/generate_boards.py | 4 +- esphome/components/web_server/__init__.py | 4 +- esphome/components/zigbee/zigbee_esp32.py | 5 +-- esphome/dashboard/dashboard.py | 3 +- esphome/dashboard/web_server.py | 6 +-- esphome/espidf/component.py | 18 ++++---- esphome/espidf/extra_script.py | 2 +- esphome/espidf/framework.py | 44 +++++++++++-------- esphome/espidf/get_idf_tool_paths.py | 3 +- esphome/espidf/runner.py | 3 +- esphome/espidf/toolchain.py | 19 ++++---- esphome/espota2.py | 2 +- esphome/helpers.py | 4 +- esphome/mqtt.py | 6 +-- esphome/web_server_ota.py | 2 +- pyproject.toml | 1 + script/api_protobuf/api_protobuf.py | 12 ++--- script/build_helpers.py | 2 +- script/bump-version.py | 5 ++- script/ci-custom.py | 2 +- script/ci_add_metadata_to_json.py | 4 +- script/ci_helpers.py | 3 +- script/ci_memory_impact_comment.py | 2 +- script/ci_memory_impact_extract.py | 4 +- script/clang-format | 3 +- script/clang-tidy | 9 ++-- script/clang_tidy_hash.py | 6 +-- script/determine-jobs.py | 2 +- script/helpers.py | 19 ++++---- script/lint-python | 6 ++- script/sync-device_class.py | 5 ++- script/test_build_components.py | 2 +- tests/dashboard/test_web_server_paths.py | 10 ++--- tests/integration/conftest.py | 2 +- tests/script/test_check_import_time.py | 7 +-- tests/script/test_determine_jobs.py | 7 +-- tests/script/test_helpers.py | 4 +- tests/script/test_test_helpers.py | 5 +-- tests/unit_tests/core/test_config.py | 20 ++++----- tests/unit_tests/test_espidf_component.py | 2 +- tests/unit_tests/test_substitutions.py | 3 +- tests/unit_tests/test_writer.py | 8 ++-- 54 files changed, 162 insertions(+), 155 deletions(-) diff --git a/esphome/__main__.py b/esphome/__main__.py index 268164acf6..5f281ce832 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -794,7 +794,7 @@ def _check_and_emit_build_info() -> None: # Read build_info from JSON try: - with open(build_info_json_path, encoding="utf-8") as f: + with build_info_json_path.open(encoding="utf-8") as f: build_info = json.load(f) except (OSError, json.JSONDecodeError) as e: _LOGGER.debug("Failed to read build_info: %s", e) @@ -1056,7 +1056,7 @@ def _wait_for_serial_port( def _port_found() -> bool: if port is not None: if os.name == "posix": - return os.path.exists(port) + return Path(port).exists() return any(p.path == port for p in get_serial_ports()) ports = get_serial_ports() if known_ports is not None: diff --git a/esphome/analyze_memory/cli.py b/esphome/analyze_memory/cli.py index a856e2988d..4fbceb7e5e 100644 --- a/esphome/analyze_memory/cli.py +++ b/esphome/analyze_memory/cli.py @@ -6,6 +6,7 @@ from collections import defaultdict from collections.abc import Callable import heapq from operator import itemgetter +from pathlib import Path import sys from typing import TYPE_CHECKING @@ -699,7 +700,7 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): content = "\n".join(lines) if output_file: - with open(output_file, "w", encoding="utf-8") as f: + with Path(output_file).open("w", encoding="utf-8") as f: f.write(content) else: print(content) @@ -737,7 +738,6 @@ def main(): # Load build directory import json - from pathlib import Path from esphome.platformio.toolchain import IDEData @@ -785,7 +785,7 @@ def main(): if not idedata_path.exists(): continue try: - with open(idedata_path, encoding="utf-8") as f: + with idedata_path.open(encoding="utf-8") as f: raw_data = json.load(f) idedata = IDEData(raw_data) print(f"Loaded idedata from: {idedata_path}", file=sys.stderr) diff --git a/esphome/analyze_memory/toolchain.py b/esphome/analyze_memory/toolchain.py index 3a8a5f7be4..a724d52f25 100644 --- a/esphome/analyze_memory/toolchain.py +++ b/esphome/analyze_memory/toolchain.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging -import os from pathlib import Path import subprocess from typing import TYPE_CHECKING @@ -37,7 +36,7 @@ def _find_in_platformio_packages(tool_name: str) -> str | None: Full path to the tool or None if not found """ # Get PlatformIO packages directory - platformio_home = Path(os.path.expanduser("~/.platformio/packages")) + platformio_home = Path("~/.platformio/packages").expanduser() if not platformio_home.exists(): return None diff --git a/esphome/build_gen/espidf.py b/esphome/build_gen/espidf.py index 96f84ebbd1..0b50f72382 100644 --- a/esphome/build_gen/espidf.py +++ b/esphome/build_gen/espidf.py @@ -24,7 +24,7 @@ def get_available_components() -> list[str] | None: return None try: - with open(project_desc, encoding="utf-8") as f: + with project_desc.open(encoding="utf-8") as f: data = json.load(f) component_info = data.get("build_component_info", {}) diff --git a/esphome/bundle.py b/esphome/bundle.py index 4537cbce9d..d38f68ebfd 100644 --- a/esphome/bundle.py +++ b/esphome/bundle.py @@ -412,7 +412,7 @@ class ConfigBundleCreator: @staticmethod def _add_to_tar(tar: tarfile.TarFile, bf: BundleFile) -> None: """Add a BundleFile to the tar archive with deterministic metadata.""" - with open(bf.source, "rb") as f: + with bf.source.open("rb") as f: _add_bytes_to_tar(tar, bf.path, f.read()) diff --git a/esphome/components/audio_file/__init__.py b/esphome/components/audio_file/__init__.py index 23c90e9b76..8dc546cec1 100644 --- a/esphome/components/audio_file/__init__.py +++ b/esphome/components/audio_file/__init__.py @@ -98,7 +98,7 @@ def read_audio_file_and_type(file_config: ConfigType) -> tuple[bytes, MockObj]: else: raise cv.Invalid("Unsupported file source") - with open(path, "rb") as f: + with path.open("rb") as f: data = f.read() try: diff --git a/esphome/components/bme68x_bsec2/__init__.py b/esphome/components/bme68x_bsec2/__init__.py index 5083d283ef..62cd9e2e36 100644 --- a/esphome/components/bme68x_bsec2/__init__.py +++ b/esphome/components/bme68x_bsec2/__init__.py @@ -169,7 +169,7 @@ async def to_code_base(config): path = _compute_local_file_path(_compute_url(config)) try: - with open(path, encoding="utf-8") as f: + with path.open(encoding="utf-8") as f: bsec2_iaq_config = f.read() except Exception as e: raise core.EsphomeError( diff --git a/esphome/components/esp32_hosted/update/__init__.py b/esphome/components/esp32_hosted/update/__init__.py index b258a26b08..202df21ab5 100644 --- a/esphome/components/esp32_hosted/update/__init__.py +++ b/esphome/components/esp32_hosted/update/__init__.py @@ -75,7 +75,7 @@ def _validate_firmware(config: dict[str, Any]) -> None: return path = CORE.relative_config_path(config[CONF_PATH]) - with open(path, "rb") as f: + with path.open("rb") as f: firmware_data = f.read() calculated = hashlib.sha256(firmware_data).hexdigest() expected = config[CONF_SHA256].lower() @@ -93,7 +93,7 @@ async def to_code(config: dict[str, Any]) -> None: if config[CONF_TYPE] == TYPE_EMBEDDED: path = config[CONF_PATH] - with open(CORE.relative_config_path(path), "rb") as f: + with CORE.relative_config_path(path).open("rb") as f: firmware_data = f.read() rhs = [HexInt(x) for x in firmware_data] arr_id = ID(f"{config[CONF_ID]}_data", is_declaration=True, type=cg.uint8) diff --git a/esphome/components/http_request/__init__.py b/esphome/components/http_request/__init__.py index 90879c459e..2617951f0d 100644 --- a/esphome/components/http_request/__init__.py +++ b/esphome/components/http_request/__init__.py @@ -1,3 +1,5 @@ +from pathlib import Path + from esphome import automation import esphome.codegen as cg from esphome.components import esp32 @@ -174,7 +176,7 @@ async def to_code(config): if config.get(CONF_VERIFY_SSL): if ca_cert_path := config.get(CONF_CA_CERTIFICATE_PATH): - with open(ca_cert_path, encoding="utf-8") as f: + with Path(ca_cert_path).open(encoding="utf-8") as f: ca_cert_content = f.read() cg.add(var.set_ca_certificate(ca_cert_content)) else: diff --git a/esphome/components/image/__init__.py b/esphome/components/image/__init__.py index 365554f7d2..2fefbdcd58 100644 --- a/esphome/components/image/__init__.py +++ b/esphome/components/image/__init__.py @@ -395,7 +395,7 @@ def download_image(value): def is_svg_file(file): if not file: return False - with open(file, "rb") as f: + with Path(file).open("rb") as f: return " tuple[dict, dict]: for json_file in sorted(json_dir.glob("*.json")): board_name = json_file.stem - with open(json_file, encoding="utf-8") as f: + with json_file.open(encoding="utf-8") as f: data = json.load(f) build = data.get("build", {}) @@ -136,7 +136,7 @@ def _get_variant(json_file: Path) -> str | None: """Get variant name from a board JSON file.""" if not json_file.exists(): return None - with open(json_file, encoding="utf-8") as f: + with json_file.open(encoding="utf-8") as f: data = json.load(f) return data.get("build", {}).get("variant") diff --git a/esphome/components/web_server/__init__.py b/esphome/components/web_server/__init__.py index 84910b6f90..99a9b7518c 100644 --- a/esphome/components/web_server/__init__.py +++ b/esphome/components/web_server/__init__.py @@ -326,12 +326,12 @@ async def to_code(config): if CONF_CSS_INCLUDE in config: cg.add_define("USE_WEBSERVER_CSS_INCLUDE") path = CORE.relative_config_path(config[CONF_CSS_INCLUDE]) - with open(file=path, encoding="utf-8") as css_file: + with path.open(encoding="utf-8") as css_file: add_resource_as_progmem("CSS_INCLUDE", css_file.read()) if CONF_JS_INCLUDE in config: cg.add_define("USE_WEBSERVER_JS_INCLUDE") path = CORE.relative_config_path(config[CONF_JS_INCLUDE]) - with open(file=path, encoding="utf-8") as js_file: + with path.open(encoding="utf-8") as js_file: add_resource_as_progmem("JS_INCLUDE", js_file.read()) cg.add(var.set_include_internal(config[CONF_INCLUDE_INTERNAL])) if CONF_LOCAL in config and config[CONF_LOCAL]: diff --git a/esphome/components/zigbee/zigbee_esp32.py b/esphome/components/zigbee/zigbee_esp32.py index 89efd583ab..a0fadbce8b 100644 --- a/esphome/components/zigbee/zigbee_esp32.py +++ b/esphome/components/zigbee/zigbee_esp32.py @@ -129,9 +129,8 @@ def final_validate_esp32(config: ConfigType) -> ConfigType: if CONF_PARTITIONS in fv.full_config.get() and not isinstance( fv.full_config.get()[CONF_PARTITIONS], list ): - with open( - CORE.relative_config_path(fv.full_config.get()[CONF_PARTITIONS]), - encoding="utf8", + with CORE.relative_config_path(fv.full_config.get()[CONF_PARTITIONS]).open( + encoding="utf8" ) as f: partitions_tab = f.read() for partition, types in [ diff --git a/esphome/dashboard/dashboard.py b/esphome/dashboard/dashboard.py index 81c10763e7..7fc21f8a44 100644 --- a/esphome/dashboard/dashboard.py +++ b/esphome/dashboard/dashboard.py @@ -6,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor import contextlib import logging import os +from pathlib import Path import socket import threading from time import monotonic @@ -149,4 +150,4 @@ async def async_start(args) -> None: await dashboard.async_run() finally: if sock: - os.remove(sock) + Path(sock).unlink() diff --git a/esphome/dashboard/web_server.py b/esphome/dashboard/web_server.py index 88b454e5cf..97d6639c1f 100644 --- a/esphome/dashboard/web_server.py +++ b/esphome/dashboard/web_server.py @@ -1040,7 +1040,7 @@ class DownloadListRequestHandler(BaseHandler): class DownloadBinaryRequestHandler(BaseHandler): def _load_file(self, path: str, compressed: bool) -> bytes: """Load a file from disk and compress it if requested.""" - with open(path, "rb") as f: + with Path(path).open("rb") as f: data = f.read() if compressed: return gzip.compress(data, 9) @@ -1292,7 +1292,7 @@ class EditRequestHandler(BaseHandler): def _read_file(self, filename: str, configuration: str) -> bytes | None: """Read a file and return the content as bytes.""" try: - with open(file=filename, encoding="utf-8") as f: + with Path(filename).open(encoding="utf-8") as f: return f.read() except FileNotFoundError: if configuration in const.SECRETS_FILES: @@ -1493,7 +1493,7 @@ def get_base_frontend_path() -> Path: static_path += "/" # This path can be relative, so resolve against the root or else templates don't work - path = Path(os.getcwd()) / static_path / "esphome_dashboard" + path = Path.cwd() / static_path / "esphome_dashboard" return path.resolve() diff --git a/esphome/espidf/component.py b/esphome/espidf/component.py index 81f2cd9632..050002d9e2 100644 --- a/esphome/espidf/component.py +++ b/esphome/espidf/component.py @@ -317,24 +317,26 @@ def _collect_filtered_files(src_dir: PathType, src_filters: list[str]) -> list[s if pattern.endswith("/"): pattern = pattern.rstrip("/") + "/**" - full_pattern = os.path.join(glob.escape(str(src_dir)), pattern) + # glob.escape has no pathlib equivalent and the matcher works on raw + # path strings, so PTH118/PTH207 don't apply here. + full_pattern = os.path.join(glob.escape(str(src_dir)), pattern) # noqa: PTH118 matched = [] - for item in glob.glob(full_pattern, recursive=True): - if not os.path.isdir(item): + for item in glob.glob(full_pattern, recursive=True): # noqa: PTH207 + if not Path(item).is_dir(): matched.append(item) else: # PlatformIO quirk: a directory matched with "*" should include all its # nested files and subdirectories, not just the directory itself. for root, _, files in os.walk(item): - matched.extend([os.path.join(root, f) for f in files]) + matched.extend([str(Path(root) / f) for f in files]) if sign == "+": selected.update(matched) elif sign == "-": selected.difference_update(matched) - return [r for r in selected if os.path.isfile(r)] + return [r for r in selected if Path(r).is_file()] def _convert_library_to_component(library: Library) -> IDFComponent: @@ -486,7 +488,7 @@ def generate_cmakelists_txt(component: IDFComponent) -> str: # Only keep sources build_src_files = [os.path.relpath(p, component.path) for p in build_src_files] build_src_files = [ - f for f in build_src_files if os.path.splitext(f)[1] in SRC_FILE_EXTENSIONS + f for f in build_src_files if Path(f).suffix in SRC_FILE_EXTENSIONS ] # Handle build flags @@ -740,7 +742,7 @@ def _parse_library_json(library_json_path: PathType): Returns: dict: Parsed JSON content as a Python dictionary. """ - with open(library_json_path, encoding="utf8") as fp: + with Path(library_json_path).open(encoding="utf8") as fp: return json.load(fp) @@ -754,7 +756,7 @@ def _parse_library_properties(library_properties_path: PathType): Returns: dict[str, str]: Mapping of parsed property keys to values. """ - with open(library_properties_path, encoding="utf8") as fp: + with Path(library_properties_path).open(encoding="utf8") as fp: data = {} for line in fp.read().splitlines(): line = line.strip() diff --git a/esphome/espidf/extra_script.py b/esphome/espidf/extra_script.py index 2f22f23c10..bead63ca21 100644 --- a/esphome/espidf/extra_script.py +++ b/esphome/espidf/extra_script.py @@ -108,7 +108,7 @@ def run_extra_script( """ env = _FakeSConsEnv(board_mcu=idf_target, pio_env=f"esphome_{idf_target}") code = compile(script_path.read_text(), str(script_path), "exec") - old_cwd = os.getcwd() + old_cwd = Path.cwd() try: os.chdir(library_dir) exec( # noqa: S102 pylint: disable=exec-used diff --git a/esphome/espidf/framework.py b/esphome/espidf/framework.py index f393600732..331c2f84b0 100644 --- a/esphome/espidf/framework.py +++ b/esphome/espidf/framework.py @@ -138,7 +138,7 @@ def rmdir(directory: PathType, msg: str | None = None): Raises: RuntimeError: If directory removal fails """ - if os.path.isdir(directory): + if Path(directory).is_dir(): try: if msg: _LOGGER.debug(msg) @@ -192,7 +192,7 @@ def _check_stamp(file: PathType, data: dict[str, str]) -> bool: return False try: - with open(file, encoding="utf-8") as f: + with Path(file).open(encoding="utf-8") as f: return json.load(f) == data except (json.JSONDecodeError, OSError): return False @@ -206,7 +206,7 @@ def _write_stamp(file: PathType, data: dict[str, str]): file: Path to the stamp file to write data: Dictionary containing data to write """ - with open(file, "w", encoding="utf8") as fp: + with Path(file).open("w", encoding="utf8") as fp: json.dump(data, fp) @@ -471,8 +471,12 @@ def _tar_extract_all( import stat import tarfile + # Tar extraction safety: os.path.realpath / commonpath / normpath have no + # pathlib equivalents and Path.resolve() would follow symlinks unsafely. + # Use os.path for the security-sensitive parts; the simple checks move to + # Path. extract_dir = os.fspath(extract_dir) - abs_dest = os.path.abspath(extract_dir) + abs_dest = os.path.abspath(extract_dir) # noqa: PTH100 with tarfile.open(fileobj=data, mode="r") as tar_ref: all_members = tar_ref.getmembers() @@ -491,8 +495,8 @@ def _tar_extract_all( name = name.lstrip("/" + os.sep) # 2. Reject absolute paths (incl. Windows drive) - if os.path.isabs(name) or ( - os.name == "nt" and ":" in name.split(os.sep)[0] + if Path(name).is_absolute() or ( + os.name == "nt" and ":" in name.split(os.sep)[0] # noqa: PTH206 ): continue @@ -506,7 +510,7 @@ def _tar_extract_all( name = norm[len(strip_prefix) :] # 4. Compute final path - target_path = os.path.realpath(os.path.join(abs_dest, name)) + target_path = os.path.realpath(os.path.join(abs_dest, name)) # noqa: PTH118 if os.path.commonpath([abs_dest, target_path]) != abs_dest: continue @@ -515,18 +519,20 @@ def _tar_extract_all( linkname = member.linkname # Reject absolute link targets - if os.path.isabs(linkname): + if Path(linkname).is_absolute(): continue # Strip leading slashes linkname = os.path.normpath(linkname) if member.issym(): - link_target = os.path.join( - abs_dest, os.path.dirname(name), linkname + link_target = os.path.join( # noqa: PTH118 + abs_dest, + os.path.dirname(name), # noqa: PTH120 + linkname, ) else: - link_target = os.path.join(abs_dest, linkname) + link_target = os.path.join(abs_dest, linkname) # noqa: PTH118 link_target = os.path.realpath(link_target) if os.path.commonpath([abs_dest, link_target]) != abs_dest: @@ -598,7 +604,9 @@ def _zip_extract_all( """ import zipfile - extract_dir = os.path.abspath(extract_dir) + # See note in archive_extract_all_tar: os.path is used intentionally for + # the security-sensitive abspath/commonpath checks below. + extract_dir = os.path.abspath(extract_dir) # noqa: PTH100 with zipfile.ZipFile(data, "r") as zip_ref: all_members = zip_ref.infolist() @@ -618,8 +626,8 @@ def _zip_extract_all( name = member.filename.lstrip("/\\") # 2. Reject absolute paths / Windows drives - if os.path.isabs(name) or ( - os.name == "nt" and ":" in name.split(os.sep)[0] + if Path(name).is_absolute() or ( + os.name == "nt" and ":" in name.split(os.sep)[0] # noqa: PTH206 ): continue @@ -633,7 +641,7 @@ def _zip_extract_all( name = norm[len(strip_prefix) :] # 4. Compute safe target path - target_path = os.path.abspath(os.path.join(extract_dir, name)) + target_path = os.path.abspath(os.path.join(extract_dir, name)) # noqa: PTH100, PTH118 if os.path.commonpath([extract_dir, target_path]) != extract_dir: raise ValueError(f"Unsafe path detected: {member.filename}") @@ -680,7 +688,7 @@ def archive_extract_all( with ExitStack() as stack: archive_ref: io.BufferedIOBase if isinstance(archive, (str, os.PathLike)): - archive_ref = stack.enter_context(open(archive, "rb")) + archive_ref = stack.enter_context(Path(archive).open("rb")) elif isinstance(archive, (io.BufferedReader, io.BufferedRandom)): archive_ref = archive elif isinstance(archive, io.RawIOBase): @@ -727,7 +735,7 @@ def download_from_mirrors( # 1. Open target file for writing if path given with ExitStack() as stack: if isinstance(target, (str, os.PathLike)): - f = stack.enter_context(open(target, "wb")) + f = stack.enter_context(Path(target).open("wb")) elif isinstance(target, (io.RawIOBase, io.IOBase)): f = target else: @@ -917,7 +925,7 @@ def _patch_tools_json_for_linux_arm64(framework_path: Path) -> None: return try: - with open(tools_json, encoding="utf-8") as f: + with tools_json.open(encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError) as e: _LOGGER.warning( diff --git a/esphome/espidf/get_idf_tool_paths.py b/esphome/espidf/get_idf_tool_paths.py index 2e8859631d..7d99e629b1 100644 --- a/esphome/espidf/get_idf_tool_paths.py +++ b/esphome/espidf/get_idf_tool_paths.py @@ -10,6 +10,7 @@ not installed. import json import os +from pathlib import Path import sys from types import SimpleNamespace @@ -25,7 +26,7 @@ from idf_tools import ( g.idf_path = sys.argv[1] g.idf_tools_path = os.environ.get("IDF_TOOLS_PATH") -g.tools_json = os.path.join(g.idf_path, TOOLS_FILE) +g.tools_json = str(Path(g.idf_path) / TOOLS_FILE) tools_info = filter_tools_info(IDFEnv.get_idf_env(), load_tools_info()) args = SimpleNamespace(prefer_system=False) diff --git a/esphome/espidf/runner.py b/esphome/espidf/runner.py index 857d16c674..7c568db7be 100644 --- a/esphome/espidf/runner.py +++ b/esphome/espidf/runner.py @@ -91,6 +91,7 @@ def main() -> int: # ---- end sys.path fix-up ----------------------------------------------- import os + from pathlib import Path import re import runpy @@ -229,7 +230,7 @@ def main() -> int: # runpy.run_path does not do this automatically, but idf.py relies # on it to import its sibling modules (python_version_checker, # idf_py_actions, ...). - script_dir = os.path.dirname(os.path.abspath(script_path)) + script_dir = str(Path(script_path).resolve().parent) if script_dir not in sys.path: sys.path.insert(0, script_dir) diff --git a/esphome/espidf/toolchain.py b/esphome/espidf/toolchain.py index ef28575caa..752f582e74 100644 --- a/esphome/espidf/toolchain.py +++ b/esphome/espidf/toolchain.py @@ -241,20 +241,21 @@ def has_outdated_files(): dependency_lock_path = CORE.relative_build_path("dependencies.lock") build_ninja_path = CORE.relative_build_path("build/build.ninja") - if not os.path.isdir(build_config_path) or not os.listdir(build_config_path): + if not build_config_path.is_dir() or not any(build_config_path.iterdir()): return True - if not os.path.isfile(cmakecache_txt_path): + if not cmakecache_txt_path.is_file(): return True - if not os.path.isfile(build_ninja_path): + if not build_ninja_path.is_file(): return True - if os.path.isfile(dependency_lock_path) and os.path.getmtime( - dependency_lock_path - ) > os.path.getmtime(build_ninja_path): + if ( + dependency_lock_path.is_file() + and dependency_lock_path.stat().st_mtime > build_ninja_path.stat().st_mtime + ): return True - cmakecache_txt_mtime = os.path.getmtime(cmakecache_txt_path) + cmakecache_txt_mtime = cmakecache_txt_path.stat().st_mtime return any( - os.path.getmtime(f) > cmakecache_txt_mtime + f.stat().st_mtime > cmakecache_txt_mtime for f in [sdkconfig_internal_path, idf_component_yml_path] if f.exists() ) @@ -452,7 +453,7 @@ def create_factory_bin() -> bool: return False try: - with open(flasher_args_path, encoding="utf-8") as f: + with flasher_args_path.open(encoding="utf-8") as f: flash_data = json.load(f) except (json.JSONDecodeError, OSError) as e: _LOGGER.error("Failed to read flasher_args.json: %s", e) diff --git a/esphome/espota2.py b/esphome/espota2.py index 701a125bcd..266702c142 100644 --- a/esphome/espota2.py +++ b/esphome/espota2.py @@ -517,7 +517,7 @@ def run_ota_impl_( continue _LOGGER.info("Connected to %s", sa[0]) - with open(filename, "rb") as file_handle: + with Path(filename).open("rb") as file_handle: try: perform_ota(sock, password, file_handle, filename, ota_type) except OTAError as err: diff --git a/esphome/helpers.py b/esphome/helpers.py index d7ddb5c416..733474c9c9 100644 --- a/esphome/helpers.py +++ b/esphome/helpers.py @@ -385,7 +385,7 @@ def rmtree(path: Path | str) -> None: def _onerror(func, path, exc_info): if os.access(path, os.W_OK): raise exc_info[1].with_traceback(exc_info[2]) - os.chmod(path, stat.S_IWUSR | stat.S_IRUSR) + Path(path).chmod(stat.S_IWUSR | stat.S_IRUSR) func(path) # ``onerror`` is deprecated in 3.12 in favour of ``onexc`` (different @@ -512,7 +512,7 @@ def copy_file_if_changed(src: Path, dst: Path) -> bool: # -> delete file (it would be overwritten anyway), and try again # if that fails, use normal error handler with suppress(OSError): - os.unlink(dst) + Path(dst).unlink() shutil.copyfile(src, dst) return True diff --git a/esphome/mqtt.py b/esphome/mqtt.py index 098292f599..d6bde0cbfd 100644 --- a/esphome/mqtt.py +++ b/esphome/mqtt.py @@ -2,7 +2,7 @@ import contextlib from datetime import datetime import json import logging -import os +from pathlib import Path import ssl import tempfile import time @@ -120,8 +120,8 @@ def prepare( key_file.close() context.load_cert_chain(cert_file.name, key_file.name) finally: - os.unlink(cert_file.name) - os.unlink(key_file.name) + Path(cert_file.name).unlink() + Path(key_file.name).unlink() client.tls_set_context(context) try: diff --git a/esphome/web_server_ota.py b/esphome/web_server_ota.py index 7c31c1b123..8d0fdeecff 100644 --- a/esphome/web_server_ota.py +++ b/esphome/web_server_ota.py @@ -126,7 +126,7 @@ def _try_upload( _LOGGER.info("Connecting to %s port %s...", ip, port) try: - with open(filename, "rb") as fh: + with filename.open("rb") as fh: streamer = _MultipartStreamer(fh, file_size, filename.name) try: response = requests.post( diff --git a/pyproject.toml b/pyproject.toml index c6d96560d5..ae1bd34f60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,7 @@ select = [ "NPY", # numpy-specific rules "PERF", # performance "PL", # pylint + "PTH", # flake8-use-pathlib "PYI", # flake8-pyi "Q", # flake8-quotes "RSE", # flake8-raise diff --git a/script/api_protobuf/api_protobuf.py b/script/api_protobuf/api_protobuf.py index 1cc8e1ec98..240ee7890f 100755 --- a/script/api_protobuf/api_protobuf.py +++ b/script/api_protobuf/api_protobuf.py @@ -3163,7 +3163,7 @@ def main() -> None: defines_content += "\n" defines_content += "\nnamespace esphome::api {} // namespace esphome::api\n" - with open(root / "api_pb2_defines.h", "w", encoding="utf-8") as f: + with (root / "api_pb2_defines.h").open("w", encoding="utf-8") as f: f.write(defines_content) content = FILE_HEADER @@ -3448,13 +3448,13 @@ static void dump_bytes_field(DumpBuffer &out, const char *field_name, const uint #endif // HAS_PROTO_MESSAGE_DUMP """ - with open(root / "api_pb2.h", "w", encoding="utf-8") as f: + with (root / "api_pb2.h").open("w", encoding="utf-8") as f: f.write(content) - with open(root / "api_pb2.cpp", "w", encoding="utf-8") as f: + with (root / "api_pb2.cpp").open("w", encoding="utf-8") as f: f.write(cpp) - with open(root / "api_pb2_dump.cpp", "w", encoding="utf-8") as f: + with (root / "api_pb2_dump.cpp").open("w", encoding="utf-8") as f: f.write(dump_cpp) hpp = FILE_HEADER @@ -3641,10 +3641,10 @@ static const char *const TAG = "api.service"; } // namespace esphome::api """ - with open(root / "api_pb2_service.h", "w", encoding="utf-8") as f: + with (root / "api_pb2_service.h").open("w", encoding="utf-8") as f: f.write(hpp) - with open(root / "api_pb2_service.cpp", "w", encoding="utf-8") as f: + with (root / "api_pb2_service.cpp").open("w", encoding="utf-8") as f: f.write(cpp) prot_file.unlink() diff --git a/script/build_helpers.py b/script/build_helpers.py index fa722aa099..52f7ee317e 100644 --- a/script/build_helpers.py +++ b/script/build_helpers.py @@ -195,7 +195,7 @@ def load_component_yaml_configs(components: list[str], tests_dir: Path) -> dict: yaml_path = tests_dir / component / BENCHMARK_YAML_FILENAME if not yaml_path.is_file(): continue - with open(yaml_path) as f: + with yaml_path.open() as f: component_config = yaml.safe_load(f) if component_config and isinstance(component_config, dict): for key, value in component_config.items(): diff --git a/script/bump-version.py b/script/bump-version.py index ed927cb991..e09fc87c60 100755 --- a/script/bump-version.py +++ b/script/bump-version.py @@ -2,6 +2,7 @@ import argparse from dataclasses import dataclass +from pathlib import Path import re import sys @@ -39,12 +40,12 @@ class Version: def sub(path, pattern, repl, expected_count=1): - with open(path, encoding="utf-8") as fh: + with Path(path).open(encoding="utf-8") as fh: content = fh.read() content, count = re.subn(pattern, repl, content, flags=re.MULTILINE) if expected_count is not None: assert count == expected_count, f"Pattern {pattern} replacement failed!" - with open(path, "w", encoding="utf-8") as fh: + with Path(path).open("w", encoding="utf-8") as fh: fh.write(content) diff --git a/script/ci-custom.py b/script/ci-custom.py index f2a9681be5..1ac13e18f7 100755 --- a/script/ci-custom.py +++ b/script/ci-custom.py @@ -14,7 +14,7 @@ import time import colorama from helpers import filter_changed, git_ls_files, print_error_for_file, styled -sys.path.append(os.path.dirname(__file__)) +sys.path.append(str(Path(__file__).parent)) def find_all(a_str, sub): diff --git a/script/ci_add_metadata_to_json.py b/script/ci_add_metadata_to_json.py index 687b5131c0..e884e9a64c 100755 --- a/script/ci_add_metadata_to_json.py +++ b/script/ci_add_metadata_to_json.py @@ -44,7 +44,7 @@ def main() -> int: return 1 try: - with open(json_path, encoding="utf-8") as f: + with Path(json_path).open(encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError) as e: print(f"Error loading JSON: {e}", file=sys.stderr) @@ -74,7 +74,7 @@ def main() -> int: # Write back try: - with open(json_path, "w", encoding="utf-8") as f: + with Path(json_path).open("w", encoding="utf-8") as f: json.dump(data, f, indent=2) print(f"Added metadata to {args.json_file}", file=sys.stderr) except OSError as e: diff --git a/script/ci_helpers.py b/script/ci_helpers.py index 48b0e4bbfe..a51a857ada 100644 --- a/script/ci_helpers.py +++ b/script/ci_helpers.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +from pathlib import Path def write_github_output(outputs: dict[str, str | int]) -> None: @@ -16,7 +17,7 @@ def write_github_output(outputs: dict[str, str | int]) -> None: """ github_output = os.environ.get("GITHUB_OUTPUT") if github_output: - with open(github_output, "a", encoding="utf-8") as f: + with Path(github_output).open("a", encoding="utf-8") as f: f.writelines(f"{key}={value}\n" for key, value in outputs.items()) else: for key, value in outputs.items(): diff --git a/script/ci_memory_impact_comment.py b/script/ci_memory_impact_comment.py index 01316da27f..0908b99595 100755 --- a/script/ci_memory_impact_comment.py +++ b/script/ci_memory_impact_comment.py @@ -91,7 +91,7 @@ def load_analysis_json(json_path: str) -> dict | None: return None try: - with open(json_file, encoding="utf-8") as f: + with Path(json_file).open(encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError) as e: print(f"Failed to load analysis JSON: {e}", file=sys.stderr) diff --git a/script/ci_memory_impact_extract.py b/script/ci_memory_impact_extract.py index 2aa7394b11..feacc2b1af 100755 --- a/script/ci_memory_impact_extract.py +++ b/script/ci_memory_impact_extract.py @@ -127,7 +127,7 @@ def run_detailed_analysis(build_dir: str) -> dict | None: if not idedata_path.exists(): continue try: - with open(idedata_path, encoding="utf-8") as f: + with idedata_path.open(encoding="utf-8") as f: raw_data = json.load(f) idedata = IDEData(raw_data) print(f"Loaded idedata from: {idedata_path}", file=sys.stderr) @@ -264,7 +264,7 @@ def main() -> int: output_path = Path(args.output_json) output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, "w", encoding="utf-8") as f: + with output_path.open("w", encoding="utf-8") as f: json.dump(output_data, f, indent=2) print(f"Saved analysis to {args.output_json}", file=sys.stderr) diff --git a/script/clang-format b/script/clang-format index 028d752c55..df45798a30 100755 --- a/script/clang-format +++ b/script/clang-format @@ -2,6 +2,7 @@ import argparse import os +from pathlib import Path import queue import re import subprocess @@ -70,7 +71,7 @@ def main(): ) args = parser.parse_args() - cwd = os.getcwd() + cwd = Path.cwd() files = [ os.path.relpath(path, cwd) for path in git_ls_files(["*.cpp", "*.h", "*.tcc"]) ] diff --git a/script/clang-tidy b/script/clang-tidy index 1c413ffa23..56c0a9db71 100755 --- a/script/clang-tidy +++ b/script/clang-tidy @@ -2,6 +2,7 @@ import argparse import os +from pathlib import Path import queue import re import shutil @@ -32,7 +33,7 @@ def clang_options(idedata): cmd = [] # extract target architecture from triplet in g++ filename - triplet = os.path.basename(idedata["cxx_path"])[:-4] + triplet = Path(idedata["cxx_path"]).name[:-4] if triplet.startswith("xtensa-"): # clang doesn't support Xtensa (yet?), so compile in 32-bit mode and pretend we're the Xtensa compiler cmd.append("-m32") @@ -153,8 +154,8 @@ def run_tidy(executable, args, options, tmpdir, path_queue, lock, failed_files): if sys.stdout.isatty(): invocation.append("--use-color") - invocation.append(f"--header-filter={os.path.abspath(basepath)}/.*") - invocation.append(os.path.abspath(path)) + invocation.append(f"--header-filter={Path(basepath).resolve()}/.*") + invocation.append(str(Path(path).resolve())) invocation.append("--") invocation.extend(options) @@ -229,7 +230,7 @@ def main(): ) args = parser.parse_args() - cwd = os.getcwd() + cwd = Path.cwd() files = [os.path.relpath(path, cwd) for path in git_ls_files(["*.cpp"])] # Exclude benchmark files — they require google benchmark headers not # available in the ESP32 toolchain and use different naming conventions. diff --git a/script/clang_tidy_hash.py b/script/clang_tidy_hash.py index d0d8438437..f478535567 100755 --- a/script/clang_tidy_hash.py +++ b/script/clang_tidy_hash.py @@ -16,7 +16,7 @@ sys.path.insert(0, str(script_dir)) def read_file_lines(path: Path) -> list[str]: """Read lines from a file.""" - with open(path) as f: + with path.open() as f: return f.readlines() @@ -65,7 +65,7 @@ def get_clang_tidy_version_from_requirements(repo_root: Path | None = None) -> s def read_file_bytes(path: Path) -> bytes: """Read bytes from a file.""" - with open(path, "rb") as f: + with path.open("rb") as f: return f.read() @@ -120,7 +120,7 @@ def read_stored_hash(repo_root: Path | None = None) -> str | None: def write_file_content(path: Path, content: str) -> None: """Write content to a file.""" - with open(path, "w") as f: + with path.open("w") as f: f.write(content) diff --git a/script/determine-jobs.py b/script/determine-jobs.py index 01b8623813..417716cd77 100755 --- a/script/determine-jobs.py +++ b/script/determine-jobs.py @@ -306,7 +306,7 @@ def _is_clang_tidy_full_scan() -> bool: """ try: result = subprocess.run( - [os.path.join(root_path, "script", "clang_tidy_hash.py"), "--check"], + [str(Path(root_path) / "script" / "clang_tidy_hash.py"), "--check"], capture_output=True, check=False, ) diff --git a/script/helpers.py b/script/helpers.py index cf82a89f93..c56a434edf 100644 --- a/script/helpers.py +++ b/script/helpers.py @@ -17,10 +17,10 @@ from typing import Any import colorama -root_path = os.path.abspath(os.path.normpath(os.path.join(__file__, "..", ".."))) -basepath = os.path.join(root_path, "esphome") -temp_folder = os.path.join(root_path, ".temp") -temp_header_file = os.path.join(temp_folder, "all-include.cpp") +root_path = str(Path(__file__).resolve().parent.parent) +basepath = str(Path(root_path) / "esphome") +temp_folder = str(Path(root_path) / ".temp") +temp_header_file = str(Path(temp_folder) / "all-include.cpp") # C++ file extensions used for clang-tidy and clang-format checks CPP_FILE_EXTENSIONS = (".cpp", ".h", ".hpp", ".cc", ".cxx", ".c", ".tcc") @@ -339,8 +339,8 @@ def _get_github_event_data() -> dict | None: Parsed event data dictionary, or None if not available """ github_event_path = os.environ.get("GITHUB_EVENT_PATH") - if github_event_path and os.path.exists(github_event_path): - with open(github_event_path) as f: + if github_event_path and Path(github_event_path).exists(): + with Path(github_event_path).open() as f: return json.load(f) return None @@ -464,7 +464,8 @@ def _get_changed_files_from_command(command: list[str]) -> list[str]: raise Exception(f"Command failed: {' '.join(command)}\nstderr: {proc.stderr}") changed_files = splitlines_no_ends(proc.stdout) - changed_files = [os.path.relpath(f, os.getcwd()) for f in changed_files if f] + cwd = Path.cwd() + changed_files = [os.path.relpath(f, cwd) for f in changed_files if f] # noqa: PTH109 changed_files.sort() return changed_files @@ -499,7 +500,7 @@ def get_changed_components() -> list[str] | None: return None # Use list-components.py to get changed components - script_path = os.path.join(root_path, "script", "list-components.py") + script_path = str(Path(root_path) / "script" / "list-components.py") cmd = [script_path, "--changed"] try: @@ -619,7 +620,7 @@ def filter_changed(files: list[str]) -> list[str]: def filter_grep(files: list[str], value: list[str]) -> list[str]: matched = [] for file in files: - with open(file, encoding="utf-8") as handle: + with Path(file).open(encoding="utf-8") as handle: contents = handle.read() if any(v in contents for v in value): matched.append(file) diff --git a/script/lint-python b/script/lint-python index 18281c711e..e4b3314d2a 100755 --- a/script/lint-python +++ b/script/lint-python @@ -2,6 +2,7 @@ import argparse import os +from pathlib import Path import re import sys @@ -66,11 +67,12 @@ def main(): args = parser.parse_args() files = [] + cwd = Path.cwd() for path in git_ls_files(): filetypes = (".py",) - ext = os.path.splitext(path)[1] + ext = Path(path).suffix if ext in filetypes and path.startswith("esphome"): - path = os.path.relpath(path, os.getcwd()) + path = os.path.relpath(path, cwd) files.append(path) # Match against re file_name_re = re.compile("|".join(args.files)) diff --git a/script/sync-device_class.py b/script/sync-device_class.py index 121c89b8f9..660142195a 100755 --- a/script/sync-device_class.py +++ b/script/sync-device_class.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from pathlib import Path import re # pylint: disable=import-error @@ -34,10 +35,10 @@ DOMAINS = { def sub(path, pattern, repl): - with open(path, encoding="utf-8") as handle: + with Path(path).open(encoding="utf-8") as handle: content = handle.read() content = re.sub(pattern, repl, content, flags=re.MULTILINE) - with open(path, "w", encoding="utf-8") as handle: + with Path(path).open("w", encoding="utf-8") as handle: handle.write(content) diff --git a/script/test_build_components.py b/script/test_build_components.py index 51f3758291..767b55c94b 100755 --- a/script/test_build_components.py +++ b/script/test_build_components.py @@ -297,7 +297,7 @@ def write_github_summary( test_results: List of all test results """ summary_content = format_github_summary(test_results, toolchain) - with open(os.environ["GITHUB_STEP_SUMMARY"], "a", encoding="utf-8") as f: + with Path(os.environ["GITHUB_STEP_SUMMARY"]).open("a", encoding="utf-8") as f: f.write(summary_content) diff --git a/tests/dashboard/test_web_server_paths.py b/tests/dashboard/test_web_server_paths.py index b596ebb581..efeafbf3b5 100644 --- a/tests/dashboard/test_web_server_paths.py +++ b/tests/dashboard/test_web_server_paths.py @@ -34,9 +34,7 @@ def test_get_base_frontend_path_dev_mode() -> None: # The function uses Path.resolve() which resolves symlinks # The actual function adds "/" to the path, so we simulate that test_path_with_slash = test_path if test_path.endswith("/") else test_path + "/" - expected = ( - Path(os.getcwd()) / test_path_with_slash / "esphome_dashboard" - ).resolve() + expected = (Path.cwd() / test_path_with_slash / "esphome_dashboard").resolve() assert result == expected @@ -62,9 +60,7 @@ def test_get_base_frontend_path_dev_mode_relative_path() -> None: # The function uses Path.resolve() which resolves symlinks # The actual function adds "/" to the path, so we simulate that test_path_with_slash = test_path if test_path.endswith("/") else test_path + "/" - expected = ( - Path(os.getcwd()) / test_path_with_slash / "esphome_dashboard" - ).resolve() + expected = (Path.cwd() / test_path_with_slash / "esphome_dashboard").resolve() assert result == expected assert result.is_absolute() @@ -157,7 +153,7 @@ def test_load_file_path(tmp_path: Path) -> None: test_file = tmp_path / "test.txt" test_file.write_bytes(b"test content") - with open(test_file, "rb") as f: + with test_file.open("rb") as f: content = f.read() assert content == b"test content" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index e593929583..a9c9e0686f 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -79,7 +79,7 @@ def shared_platformio_cache() -> Generator[Path]: lock_file = Path.home() / ".esphome-integration-tests-init.lock" # Always acquire the lock to ensure cache is ready before proceeding - with open(lock_file, "w") as lock_fd: + with lock_file.open("w") as lock_fd: fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX) # Check if the native platform is installed (the actual indicator of a populated cache) diff --git a/tests/script/test_check_import_time.py b/tests/script/test_check_import_time.py index 223c58002c..528ca0701c 100644 --- a/tests/script/test_check_import_time.py +++ b/tests/script/test_check_import_time.py @@ -4,7 +4,6 @@ from __future__ import annotations import importlib.util import json -import os from pathlib import Path import sys from unittest.mock import patch @@ -13,12 +12,10 @@ import pytest # Load the script-under-test as `check_import_time` (it's a hyphenated path # inside `script/` that mirrors the existing `determine_jobs` pattern). -script_dir = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..", "..", "script") -) +script_dir = str((Path(__file__).parent / ".." / ".." / "script").resolve()) sys.path.insert(0, script_dir) spec = importlib.util.spec_from_file_location( - "check_import_time", os.path.join(script_dir, "check_import_time.py") + "check_import_time", str(Path(script_dir) / "check_import_time.py") ) check_import_time = importlib.util.module_from_spec(spec) spec.loader.exec_module(check_import_time) diff --git a/tests/script/test_determine_jobs.py b/tests/script/test_determine_jobs.py index 7bb9fe2543..ac3c6424bf 100644 --- a/tests/script/test_determine_jobs.py +++ b/tests/script/test_determine_jobs.py @@ -3,7 +3,6 @@ from collections.abc import Generator import importlib.util import json -import os from pathlib import Path import sys from unittest.mock import Mock, call, patch @@ -11,9 +10,7 @@ from unittest.mock import Mock, call, patch import pytest # Add the script directory to Python path so we can import the module -script_dir = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..", "..", "script") -) +script_dir = str((Path(__file__).parent / ".." / ".." / "script").resolve()) sys.path.insert(0, script_dir) # Import helpers module for patching @@ -22,7 +19,7 @@ import helpers # noqa: E402 import script.helpers # noqa: E402 spec = importlib.util.spec_from_file_location( - "determine_jobs", os.path.join(script_dir, "determine-jobs.py") + "determine_jobs", str(Path(script_dir) / "determine-jobs.py") ) determine_jobs = importlib.util.module_from_spec(spec) spec.loader.exec_module(determine_jobs) diff --git a/tests/script/test_helpers.py b/tests/script/test_helpers.py index 10f258aa83..82ff5e1411 100644 --- a/tests/script/test_helpers.py +++ b/tests/script/test_helpers.py @@ -12,9 +12,7 @@ import pytest from pytest import MonkeyPatch # Add the script directory to Python path so we can import helpers -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "script")) -) +sys.path.insert(0, str((Path(__file__).parent / ".." / ".." / "script").resolve())) import helpers # noqa: E402 diff --git a/tests/script/test_test_helpers.py b/tests/script/test_test_helpers.py index 3149712563..a8100252da 100644 --- a/tests/script/test_test_helpers.py +++ b/tests/script/test_test_helpers.py @@ -1,6 +1,5 @@ """Unit tests for script/build_helpers.py manifest override and build helpers.""" -import os from pathlib import Path import sys import textwrap @@ -9,9 +8,7 @@ from unittest.mock import MagicMock, patch import pytest # Add the script directory to Python path so we can import build_helpers -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "script")) -) +sys.path.insert(0, str((Path(__file__).parent / ".." / ".." / "script").resolve())) import build_helpers # noqa: E402 diff --git a/tests/unit_tests/core/test_config.py b/tests/unit_tests/core/test_config.py index 4ce862315d..b5b35b5172 100644 --- a/tests/unit_tests/core/test_config.py +++ b/tests/unit_tests/core/test_config.py @@ -486,7 +486,7 @@ def test_preload_core_config_basic(setup_core: Path) -> None: assert CONF_BUILD_PATH in config[CONF_ESPHOME] # Verify default build path is "build/" build_path = config[CONF_ESPHOME][CONF_BUILD_PATH] - assert build_path.endswith(os.path.join("build", "test_device")) + assert build_path.endswith(str(Path("build") / "test_device")) def test_preload_core_config_with_build_path(setup_core: Path) -> None: @@ -523,7 +523,7 @@ def test_preload_core_config_env_build_path(setup_core: Path) -> None: assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH] # Verify it uses the env var path with device name appended build_path = config[CONF_ESPHOME][CONF_BUILD_PATH] - expected_path = os.path.join("/env/build", "test_device") + expected_path = str(Path("/env/build") / "test_device") assert build_path == expected_path or build_path == expected_path.replace( "/", os.sep ) @@ -739,7 +739,7 @@ async def test_add_includes_with_single_file( """Test add_includes copies a single header file to build directory.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create include file include_file = tmp_path / "my_header.h" @@ -769,7 +769,7 @@ async def test_add_includes_with_directory_unix( """Test add_includes copies all files from a directory on Unix.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create include directory with files include_dir = tmp_path / "includes" @@ -814,7 +814,7 @@ async def test_add_includes_with_directory_windows( """Test add_includes copies all files from a directory on Windows.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create include directory with files include_dir = tmp_path / "includes" @@ -856,7 +856,7 @@ async def test_add_includes_with_multiple_sources( """Test add_includes with multiple files and directories.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create various include sources single_file = tmp_path / "single.h" @@ -884,7 +884,7 @@ async def test_add_includes_empty_directory( """Test add_includes with an empty directory doesn't fail.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create empty directory empty_dir = tmp_path / "empty" @@ -906,7 +906,7 @@ async def test_add_includes_preserves_directory_structure_unix( """Test that add_includes preserves relative directory structure on Unix.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create nested directory structure lib_dir = tmp_path / "lib" @@ -940,7 +940,7 @@ async def test_add_includes_preserves_directory_structure_windows( """Test that add_includes preserves relative directory structure on Windows.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create nested directory structure lib_dir = tmp_path / "lib" @@ -973,7 +973,7 @@ async def test_add_includes_overwrites_existing_files( """Test that add_includes overwrites existing files in build directory.""" CORE.config_path = tmp_path / "config.yaml" CORE.build_path = tmp_path / "build" - os.makedirs(CORE.build_path, exist_ok=True) + CORE.build_path.mkdir(parents=True, exist_ok=True) # Create include file include_file = tmp_path / "header.h" diff --git a/tests/unit_tests/test_espidf_component.py b/tests/unit_tests/test_espidf_component.py index f50f5317de..4f0a71053d 100644 --- a/tests/unit_tests/test_espidf_component.py +++ b/tests/unit_tests/test_espidf_component.py @@ -293,7 +293,7 @@ def test_extra_script_captures_libpath_libs_and_defines(tmp_path): result = run_extra_script(script, library_dir=tmp_path, idf_target="esp32") - assert result.libpath == [os.path.join("src", "esp32")] + assert result.libpath == [str(Path("src") / "esp32")] assert result.libs == ["algobsec"] assert ("BAR", "1") in result.cppdefines assert "FOO" in result.cppdefines diff --git a/tests/unit_tests/test_substitutions.py b/tests/unit_tests/test_substitutions.py index 4783112578..c71be2fbab 100644 --- a/tests/unit_tests/test_substitutions.py +++ b/tests/unit_tests/test_substitutions.py @@ -1,4 +1,3 @@ -import glob import logging from pathlib import Path from typing import Any @@ -106,7 +105,7 @@ REMOTES = { # Collect all input YAML files for test_substitutions_fixtures parametrized tests: HERE = Path(__file__).parent BASE_DIR = HERE / "fixtures" / "substitutions" -SOURCES = sorted(glob.glob(str(BASE_DIR / "*.input.yaml"))) +SOURCES = sorted(str(p) for p in BASE_DIR.glob("*.input.yaml")) assert SOURCES, f"test_substitutions_fixtures: No input YAML files found in {BASE_DIR}" diff --git a/tests/unit_tests/test_writer.py b/tests/unit_tests/test_writer.py index 91b4bd8e87..fc49f03067 100644 --- a/tests/unit_tests/test_writer.py +++ b/tests/unit_tests/test_writer.py @@ -1358,7 +1358,7 @@ def test_clean_build_handles_readonly_files( # Create a read-only file (simulating git pack files on Windows) readonly_file = git_dir / "pack-abc123.pack" readonly_file.write_text("pack data") - os.chmod(readonly_file, stat.S_IRUSR) # Read-only + readonly_file.chmod(stat.S_IRUSR) # Read-only # Setup mocks mock_core.relative_pioenvs_path.return_value = pioenvs_dir @@ -1393,7 +1393,7 @@ def test_clean_all_handles_readonly_files( subdir.mkdir() readonly_file = subdir / "readonly.txt" readonly_file.write_text("content") - os.chmod(readonly_file, stat.S_IRUSR) # Read-only + readonly_file.chmod(stat.S_IRUSR) # Read-only # Verify file is read-only assert not os.access(readonly_file, os.W_OK) @@ -1422,7 +1422,7 @@ def test_clean_build_reraises_for_other_errors( test_file.write_text("content") # Make subdir read-only so files inside can't be deleted - os.chmod(subdir, stat.S_IRUSR | stat.S_IXUSR) + subdir.chmod(stat.S_IRUSR | stat.S_IXUSR) # Setup mocks mock_core.relative_pioenvs_path.return_value = pioenvs_dir @@ -1440,7 +1440,7 @@ def test_clean_build_reraises_for_other_errors( clean_build() finally: # Cleanup - restore write permission so tmp_path cleanup works - os.chmod(subdir, stat.S_IRWXU) + subdir.chmod(stat.S_IRWXU) # Tests for get_build_info()