[core] Enable ruff DTZ (flake8-datetimez) lint family (#16660)

This commit is contained in:
J. Nick Koston
2026-05-26 15:29:38 -05:00
committed by GitHub
parent bac62cb7de
commit 96816e2491
12 changed files with 40 additions and 28 deletions

View File

@@ -639,7 +639,7 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
chunk = ser.read(ser.in_waiting or 1) chunk = ser.read(ser.in_waiting or 1)
if not chunk: if not chunk:
continue continue
time_ = datetime.now() time_ = datetime.now().astimezone()
milliseconds = time_.microsecond // 1000 milliseconds = time_.microsecond // 1000
time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]" time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{milliseconds:03}]"

View File

@@ -119,7 +119,7 @@ async def async_run_logs(
def on_log(msg: SubscribeLogsResponse) -> None: def on_log(msg: SubscribeLogsResponse) -> None:
"""Handle a new log message.""" """Handle a new log message."""
time_ = datetime.now() time_ = datetime.now().astimezone()
message: bytes = msg.message message: bytes = msg.message
text = message.decode("utf8", "backslashreplace") text = message.decode("utf8", "backslashreplace")
nanoseconds = time_.microsecond // 1000 nanoseconds = time_.microsecond // 1000

View File

@@ -161,7 +161,10 @@ async def _attr_to_code(config: ConfigType) -> None:
zigbee_set_string(basic_attrs.mf_name, "esphome"), zigbee_set_string(basic_attrs.mf_name, "esphome"),
zigbee_set_string(basic_attrs.model_id, config[CONF_MODEL]), zigbee_set_string(basic_attrs.model_id, config[CONF_MODEL]),
zigbee_set_string( zigbee_set_string(
basic_attrs.date_code, datetime.datetime.now().strftime("%Y%m%d %H%M%S") basic_attrs.date_code,
# Local build time, matching the esp32 implementation
# (App.get_build_time() in C++).
datetime.datetime.now().astimezone().strftime("%Y%m%d %H%M%S"),
), ),
zigbee_assign( zigbee_assign(
basic_attrs.power_source, basic_attrs.power_source,

View File

@@ -1183,7 +1183,9 @@ def date_time(date: bool, time: bool):
format += "%p" format += "%p"
try: try:
date_obj = datetime.strptime(value, format) # The generated format never includes %z/%Z, so this parses a
# naive wall-clock date/time by design.
date_obj = datetime.strptime(value, format) # noqa: DTZ007
except ValueError as err: except ValueError as err:
raise Invalid(f"Invalid {exc_message}: {err}") from err raise Invalid(f"Invalid {exc_message}: {err}") from err

View File

@@ -7,6 +7,7 @@ from datetime import UTC, datetime
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
import time
import requests import requests
@@ -141,9 +142,11 @@ def has_remote_file_changed(
def is_file_recent(file_path: Path, refresh: TimePeriodSeconds) -> bool: def is_file_recent(file_path: Path, refresh: TimePeriodSeconds) -> bool:
if file_path.exists(): if file_path.exists():
creation_time = file_path.stat().st_ctime # st_mtime, not st_ctime: ctime is inode-change time on POSIX
current_time = datetime.now().timestamp() # (bumped by chmod/chown/rename) so a metadata touch would make
return current_time - creation_time <= refresh.total_seconds # the file look fresh.
modification_time = file_path.stat().st_mtime
return time.time() - modification_time <= refresh.total_seconds
return False return False

View File

@@ -1,12 +1,12 @@
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
import hashlib import hashlib
import logging import logging
from pathlib import Path from pathlib import Path
import re import re
import subprocess import subprocess
import sys import sys
import time
import urllib.parse import urllib.parse
import esphome.config_validation as cv import esphome.config_validation as cv
@@ -247,11 +247,11 @@ def clone_or_update(
return repo_dir, None return repo_dir, None
file_timestamp = Path(repo_dir / ".git" / "FETCH_HEAD") file_timestamp = Path(repo_dir / ".git" / "FETCH_HEAD")
# On first clone, FETCH_HEAD does not exists # On first clone, FETCH_HEAD does not exist
if not file_timestamp.exists(): if not file_timestamp.exists():
file_timestamp = Path(repo_dir / ".git" / "HEAD") file_timestamp = Path(repo_dir / ".git" / "HEAD")
age = datetime.now() - datetime.fromtimestamp(file_timestamp.stat().st_mtime) age_seconds = time.time() - file_timestamp.stat().st_mtime
if refresh is None or age.total_seconds() > refresh.total_seconds: if refresh is None or age_seconds > refresh.total_seconds:
# Try to update the repository, recovering from broken state if needed # Try to update the repository, recovering from broken state if needed
old_sha: str | None = None old_sha: str | None = None
try: try:

View File

@@ -139,7 +139,7 @@ def show_discover(config, username=None, password=None, client_id=None):
_LOGGER.info("Starting log output from %s", topic) _LOGGER.info("Starting log output from %s", topic)
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
time_ = datetime.now().time().strftime("[%H:%M:%S]") time_ = datetime.now().astimezone().time().strftime("[%H:%M:%S]")
payload = msg.payload.decode(errors="backslashreplace") payload = msg.payload.decode(errors="backslashreplace")
if len(payload) > 0: if len(payload) > 0:
message = time_ + " " + payload message = time_ + " " + payload
@@ -184,7 +184,7 @@ def get_esphome_device_ip(
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
nonlocal dev_ip nonlocal dev_ip
time_ = datetime.now().time().strftime("[%H:%M:%S]") time_ = datetime.now().astimezone().time().strftime("[%H:%M:%S]")
payload = msg.payload.decode(errors="backslashreplace") payload = msg.payload.decode(errors="backslashreplace")
if len(payload) > 0: if len(payload) > 0:
message = time_ + " " + payload message = time_ + " " + payload
@@ -253,7 +253,7 @@ def show_logs(config, topic=None, username=None, password=None, client_id=None):
_LOGGER.info("Starting log output from %s", topic) _LOGGER.info("Starting log output from %s", topic)
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
time_ = datetime.now().time().strftime("[%H:%M:%S]") time_ = datetime.now().astimezone().time().strftime("[%H:%M:%S]")
payload = msg.payload.decode(errors="backslashreplace") payload = msg.payload.decode(errors="backslashreplace")
message = time_ + payload message = time_ + payload
safe_print(message) safe_print(message)

View File

@@ -338,7 +338,10 @@ class EsphomeStorageJSON:
@property @property
def last_update_check(self) -> datetime | None: def last_update_check(self) -> datetime | None:
try: try:
return datetime.strptime(self.last_update_check_str, "%Y-%m-%dT%H:%M:%S") # Stored format is naive ISO without %z; preserved for backward compat.
return datetime.strptime( # noqa: DTZ007
self.last_update_check_str, "%Y-%m-%dT%H:%M:%S"
)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
return None return None

View File

@@ -113,6 +113,7 @@ exclude = ['generated']
select = [ select = [
"B", # flake8-bugbear "B", # flake8-bugbear
"C4", # flake8-comprehensions "C4", # flake8-comprehensions
"DTZ", # flake8-datetimez
"E", # pycodestyle "E", # pycodestyle
"EXE", # flake8-executable "EXE", # flake8-executable
"F", # pyflakes/autoflake "F", # pyflakes/autoflake

View File

@@ -120,7 +120,7 @@ def test_is_file_recent_with_old_file(setup_core: Path) -> None:
old_time = time.time() - 7200 old_time = time.time() - 7200
mock_stat = MagicMock() mock_stat = MagicMock()
mock_stat.st_ctime = old_time mock_stat.st_mtime = old_time
with patch.object(Path, "stat", return_value=mock_stat): with patch.object(Path, "stat", return_value=mock_stat):
refresh = TimePeriod(seconds=3600) refresh = TimePeriod(seconds=3600)
@@ -147,7 +147,7 @@ def test_is_file_recent_with_zero_refresh(setup_core: Path) -> None:
# Mock stat to return a time 10 seconds ago # Mock stat to return a time 10 seconds ago
mock_stat = MagicMock() mock_stat = MagicMock()
mock_stat.st_ctime = time.time() - 10 mock_stat.st_mtime = time.time() - 10
with patch.object(Path, "stat", return_value=mock_stat): with patch.object(Path, "stat", return_value=mock_stat):
refresh = TimePeriod(seconds=0) refresh = TimePeriod(seconds=0)
result = external_files.is_file_recent(test_file, refresh) result = external_files.is_file_recent(test_file, refresh)

View File

@@ -1,8 +1,8 @@
"""Tests for git.py module.""" """Tests for git.py module."""
from datetime import datetime, timedelta
import os import os
from pathlib import Path from pathlib import Path
import time
from typing import Any from typing import Any
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
@@ -34,9 +34,9 @@ def _setup_old_repo(repo_dir: Path, days_old: int = 2) -> None:
# Create FETCH_HEAD file with old timestamp # Create FETCH_HEAD file with old timestamp
fetch_head = git_dir / "FETCH_HEAD" fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test") fetch_head.write_text("test")
old_time = datetime.now() - timedelta(days=days_old) old_time = time.time() - days_old * 86400
fetch_head.touch() fetch_head.touch()
os.utime(fetch_head, (old_time.timestamp(), old_time.timestamp())) os.utime(fetch_head, (old_time, old_time))
def _get_git_command_type(cmd: list[str]) -> str | None: def _get_git_command_type(cmd: list[str]) -> str | None:
@@ -285,10 +285,10 @@ def test_clone_or_update_with_refresh_updates_old_repo(
# Create FETCH_HEAD file with old timestamp (2 days ago) # Create FETCH_HEAD file with old timestamp (2 days ago)
fetch_head = git_dir / "FETCH_HEAD" fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test") fetch_head.write_text("test")
old_time = datetime.now() - timedelta(days=2) old_time = time.time() - 2 * 86400
fetch_head.touch() # Create the file fetch_head.touch() # Create the file
# Set modification time to 2 days ago # Set modification time to 2 days ago
os.utime(fetch_head, (old_time.timestamp(), old_time.timestamp())) os.utime(fetch_head, (old_time, old_time))
# Mock git command responses # Mock git command responses
mock_run_git_command.return_value = "abc123" # SHA for rev-parse mock_run_git_command.return_value = "abc123" # SHA for rev-parse
@@ -333,10 +333,10 @@ def test_clone_or_update_with_refresh_skips_fresh_repo(
# Create FETCH_HEAD file with recent timestamp (1 hour ago) # Create FETCH_HEAD file with recent timestamp (1 hour ago)
fetch_head = git_dir / "FETCH_HEAD" fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test") fetch_head.write_text("test")
recent_time = datetime.now() - timedelta(hours=1) recent_time = time.time() - 3600
fetch_head.touch() # Create the file fetch_head.touch() # Create the file
# Set modification time to 1 hour ago # Set modification time to 1 hour ago
os.utime(fetch_head, (recent_time.timestamp(), recent_time.timestamp())) os.utime(fetch_head, (recent_time, recent_time))
# Call with refresh=1d (1 day) # Call with refresh=1d (1 day)
refresh = TimePeriodSeconds(days=1) refresh = TimePeriodSeconds(days=1)
@@ -409,10 +409,10 @@ def test_clone_or_update_with_none_refresh_always_updates(
# Create FETCH_HEAD file with very recent timestamp (1 second ago) # Create FETCH_HEAD file with very recent timestamp (1 second ago)
fetch_head = git_dir / "FETCH_HEAD" fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test") fetch_head.write_text("test")
recent_time = datetime.now() - timedelta(seconds=1) recent_time = time.time() - 1
fetch_head.touch() # Create the file fetch_head.touch() # Create the file
# Set modification time to 1 second ago # Set modification time to 1 second ago
os.utime(fetch_head, (recent_time.timestamp(), recent_time.timestamp())) os.utime(fetch_head, (recent_time, recent_time))
# Mock git command responses # Mock git command responses
mock_run_git_command.return_value = "abc123" # SHA for rev-parse mock_run_git_command.return_value = "abc123" # SHA for rev-parse

View File

@@ -576,8 +576,8 @@ def test_esphome_storage_json_last_update_check_property() -> None:
assert result.hour == 10 assert result.hour == 10
assert result.minute == 30 assert result.minute == 30
# Test setter # Test setter — naive datetime matches the storage round-trip format.
new_date = datetime(2024, 2, 20, 15, 45, 30) new_date = datetime(2024, 2, 20, 15, 45, 30) # noqa: DTZ001
storage.last_update_check = new_date storage.last_update_check = new_date
assert storage.last_update_check_str == "2024-02-20T15:45:30" assert storage.last_update_check_str == "2024-02-20T15:45:30"