[core] Enable ruff B (flake8-bugbear) lint family (#16655)

This commit is contained in:
J. Nick Koston
2026-05-25 21:28:14 -05:00
committed by GitHub
parent 489cf483d0
commit ae814cff5c
27 changed files with 54 additions and 50 deletions

View File

@@ -608,7 +608,7 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
try:
module = importlib.import_module("esphome.components." + CORE.target_platform)
process_stacktrace = getattr(module, "process_stacktrace")
process_stacktrace = module.process_stacktrace
except (AttributeError, ImportError):
_LOGGER.info(
'Stacktrace analysis is unavailable: no compatible analyzer found for target platform "%s".',
@@ -1101,7 +1101,7 @@ def upload_program(
host = devices[0]
try:
module = importlib.import_module("esphome.components." + CORE.target_platform)
if getattr(module, "upload_program")(config, args, host):
if module.upload_program(config, args, host):
return 0, host
except AttributeError:
pass
@@ -1353,7 +1353,7 @@ def _validate_bootloader_binary(binary: Path) -> None:
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
try:
module = importlib.import_module("esphome.components." + CORE.target_platform)
if getattr(module, "show_logs")(config, args, devices):
if module.show_logs(config, args, devices):
return 0
except AttributeError:
pass

View File

@@ -509,7 +509,7 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
lines.append(
f"{_COMPONENT_CORE} Symbols > {self.SYMBOL_SIZE_THRESHOLD} B ({len(large_core_symbols)} symbols):"
)
for i, (symbol, demangled, size) in enumerate(large_core_symbols):
for i, (_symbol, demangled, size) in enumerate(large_core_symbols):
# Core symbols only track (symbol, demangled, size) without section info,
# so we don't show section labels here
lines.append(
@@ -601,7 +601,7 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
lines.append(
f"{comp_name} Symbols > {self.SYMBOL_SIZE_THRESHOLD} B & storage ({len(large_symbols)} symbols):"
)
for i, (symbol, demangled, size, section) in enumerate(large_symbols):
for i, (_symbol, demangled, size, section) in enumerate(large_symbols):
lines.append(
f"{i + 1}. {self._format_symbol_with_section(demangled, size, section)}"
)
@@ -640,7 +640,7 @@ class MemoryAnalyzerCLI(MemoryAnalyzer):
lines.append(
f" Symbols > {self.RAM_SYMBOL_SIZE_THRESHOLD} B ({len(large_ram_syms)}):"
)
for symbol, demangled, size, section in large_ram_syms[:10]:
for _symbol, demangled, size, section in large_ram_syms[:10]:
# Format section label consistently by stripping leading dot
section_label = section.lstrip(".") if section else ""
display_name = _format_pstorage_name(demangled)

View File

@@ -154,7 +154,7 @@ def batch_demangle(
failed_count = 0
for original, stripped, prefix, demangled in zip(
symbols, symbols_stripped, symbols_prefixes, demangled_lines
symbols, symbols_stripped, symbols_prefixes, demangled_lines, strict=True
):
# Add back any prefix that was removed
demangled = _restore_symbol_prefix(prefix, stripped, demangled)

View File

@@ -108,7 +108,7 @@ async def async_run_logs(
platform_process_stacktrace = None
try:
module = importlib.import_module("esphome.components." + CORE.target_platform)
platform_process_stacktrace = getattr(module, "process_stacktrace")
platform_process_stacktrace = module.process_stacktrace
except (AttributeError, ImportError):
_LOGGER.info(
'Stacktrace analysis is unavailable: no compatible analyzer found for target platform "%s".',

View File

@@ -594,7 +594,9 @@ async def to_code(config):
x.height,
]
for (x, y) in zip(
glyph_args, list(accumulate([len(x.bitmap_data) for x in glyph_args]))
glyph_args,
list(accumulate([len(x.bitmap_data) for x in glyph_args])),
strict=True,
)
]

View File

@@ -239,7 +239,7 @@ def color_retmapper(value):
else:
r, g, b, _ = from_rgbw(cval)
return literal(f"lv_color_make({r}, {g}, {b})")
assert False
raise AssertionError(f"Unhandled lv_color value: {value!r}")
def option_string(value):

View File

@@ -97,7 +97,7 @@ class TabviewType(WidgetType):
tab_bar = Widget(bar_obj, obj_spec)
await set_obj_properties(tab_bar, tab_style)
if tab_items_style:
for index, tab_conf in enumerate(config[CONF_TABS]):
for index, _tab_conf in enumerate(config[CONF_TABS]):
await set_obj_properties(
Widget(lv_obj.get_child(bar_obj, index), button_spec),
tab_items_style,

View File

@@ -26,7 +26,7 @@ CONFIG_SCHEMA = MSA_SENSOR_SCHEMA.extend(
),
key=CONF_NAME,
)
for event, icon in zip(EVENT_SENSORS, ICONS)
for event, icon in zip(EVENT_SENSORS, ICONS, strict=True)
}
)

View File

@@ -21,7 +21,7 @@ async def new_openthermoutput(
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
await output.register_output(var, config)
cg.add(getattr(var, "set_id")(cg.RawExpression(f'"{key}_{config[CONF_ID]}"')))
cg.add(var.set_id(cg.RawExpression(f'"{key}_{config[CONF_ID]}"')))
input.generate_setters(var, config)
return var

View File

@@ -1192,7 +1192,7 @@ def _std(x):
def _correlation_coeff(x, y):
m_x, m_y = _mean(x), _mean(y)
s_xy = sum((x_ - m_x) * (y_ - m_y) for x_, y_ in zip(x, y))
s_xy = sum((x_ - m_x) * (y_ - m_y) for x_, y_ in zip(x, y, strict=True))
s_sq_x = sum((x_ - m_x) ** 2 for x_ in x)
s_sq_y = sum((y_ - m_y) ** 2 for y_ in y)
return s_xy / math.sqrt(s_sq_x * s_sq_y)
@@ -1228,7 +1228,7 @@ def _mat_copy(m):
def _mat_transpose(m):
return _mat_copy(zip(*m))
return _mat_copy(zip(*m, strict=True))
def _mat_identity(n):
@@ -1237,7 +1237,10 @@ def _mat_identity(n):
def _mat_dot(a, b):
b_t = _mat_transpose(b)
return [[sum(x * y for x, y in zip(row_a, col_b)) for col_b in b_t] for row_a in a]
return [
[sum(x * y for x, y in zip(row_a, col_b, strict=True)) for col_b in b_t]
for row_a in a
]
def _mat_inverse(m):

View File

@@ -1081,7 +1081,7 @@ class EnumValue:
@enum_value.setter
def enum_value(self, value):
setattr(self, "_enum_value", value)
self._enum_value = value
CORE = EsphomeCore()

View File

@@ -115,7 +115,7 @@ class MDNSStatus:
results = await asyncio.gather(
*(self.aiozc.async_resolve_host(name) for name in poll_names)
)
for name, address_list in zip(poll_names, results):
for name, address_list in zip(poll_names, results, strict=True):
result = bool(address_list)
host_mdns_state[name] = result
for entry in poll_names[name]:

View File

@@ -83,7 +83,7 @@ class PingStatus:
return_exceptions=True,
)
for entry, result in zip(ping_group, dns_results):
for entry, result in zip(ping_group, dns_results, strict=True):
if isinstance(result, Exception):
# Only update state if its unknown or from ping
# so we don't mark it as offline if we have a state
@@ -106,7 +106,7 @@ class PingStatus:
return_exceptions=True,
)
for entry_addresses, result in zip(entry_addresses, results):
for entry_address, result in zip(entry_addresses, results, strict=True):
if isinstance(result, Exception):
ping_result = False
elif isinstance(result, BaseException):
@@ -114,7 +114,7 @@ class PingStatus:
else:
host: Host = result
ping_result = host.is_alive
entry: DashboardEntry = entry_addresses[0]
entry: DashboardEntry = entry_address[0]
# If we can reach it via ping, we always set it
# online, however if we can't reach it via ping
# we only set it to offline if the state is unknown

View File

@@ -1030,7 +1030,7 @@ class DownloadListRequestHandler(BaseHandler):
try:
module = importlib.import_module(f"esphome.components.{platform}")
get_download_types = getattr(module, "get_download_types")
get_download_types = module.get_download_types
except AttributeError as exc:
raise ValueError(f"Unknown platform {platform}") from exc
downloads = get_download_types(storage_json)
@@ -1146,7 +1146,7 @@ class MainRequestHandler(BaseHandler):
begin = bool(self.get_argument("begin", False))
if settings.using_password:
# Simply accessing the xsrf_token sets the cookie for us
self.xsrf_token # pylint: disable=pointless-statement
self.xsrf_token # pylint: disable=pointless-statement # noqa: B018
else:
self.clear_cookie("_xsrf")
@@ -1519,7 +1519,10 @@ def get_static_file_url(name: str) -> str:
return f"{base}?hash={hash_}"
def make_app(debug=get_bool_env(ENV_DEV)) -> tornado.web.Application:
def make_app(debug: bool | None = None) -> tornado.web.Application:
if debug is None:
debug = get_bool_env(ENV_DEV)
def log_function(handler: tornado.web.RequestHandler) -> None:
if handler.get_status() < 400:
log_method = access_log.info

View File

@@ -358,7 +358,7 @@ def copy_src_tree():
platform = "esphome.components." + CORE.target_platform
try:
module = importlib.import_module(platform)
copy_files = getattr(module, "copy_files")
copy_files = module.copy_files
copy_files()
except AttributeError:
pass

View File

@@ -249,7 +249,7 @@ async def async_resolve_hosts(
),
return_exceptions=True,
)
for host, result in zip(pending, results):
for host, result in zip(pending, results, strict=True):
if isinstance(result, BaseException):
_LOGGER.debug("Failed to resolve %s: %s", host, result)

View File

@@ -111,6 +111,7 @@ exclude = ['generated']
[tool.ruff.lint]
select = [
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"E", # pycodestyle
"EXE", # flake8-executable

View File

@@ -3551,7 +3551,7 @@ static const char *const TAG = "api.service";
if id_ is not None and not mt.options.deprecated:
id_to_msg_name[id_] = mt.name
for id_, (_, _, case_label) in cases:
for id_, (_, _, _case_label) in cases:
msg_name = id_to_msg_name.get(id_, "")
if msg_name in message_auth_map:
needs_auth = message_auth_map[msg_name]
@@ -3614,7 +3614,7 @@ static const char *const TAG = "api.service";
# Dispatch switch
out += " switch (msg_type) {\n"
for i, (case, ifdef, case_label) in cases:
for _i, (case, ifdef, case_label) in cases:
if ifdef is not None:
out += _make_ifdef_line(ifdef) + "\n"

View File

@@ -972,7 +972,7 @@ def convert(schema, config_var, path):
}
elif schema_type == "use_id":
if inspect.ismodule(data):
m_attr_obj = getattr(data, "CONFIG_SCHEMA")
m_attr_obj = data.CONFIG_SCHEMA
use_schema = known_schemas.get(repr(m_attr_obj))
if use_schema:
[output_module, output_name] = use_schema[0][1].split(".")

View File

@@ -342,8 +342,8 @@ def lint_const_ordered(fname, content):
(i + 1, line) for i, line in enumerate(lines) if line.startswith(start)
]
ordered = sorted(matching, key=lambda x: x[1].replace("_", " "))
ordered = [(mi, ol) for (mi, _), (_, ol) in zip(matching, ordered)]
for (mi, mline), (_, ol) in zip(matching, ordered):
ordered = [(mi, ol) for (mi, _), (_, ol) in zip(matching, ordered, strict=True)]
for (mi, mline), (_, ol) in zip(matching, ordered, strict=True):
if mline == ol:
continue
target = next(i for i, line in ordered if line == mline)

View File

@@ -1047,7 +1047,7 @@ def detect_memory_impact_config(
# Find common platforms supported by ALL components
# This ensures we can build all components together in a merged config
common_platforms = set(MEMORY_IMPACT_PLATFORM_PREFERENCE)
for component, platforms in component_platforms_map.items():
for platforms in component_platforms_map.values():
common_platforms &= platforms
# Select the most preferred platform from the common set

View File

@@ -295,7 +295,7 @@ def main() -> int:
# Sort groups by signature for readability
groupable_groups = []
isolated_groups = []
for (platform, signature), group_comps in sorted(signature_groups.items()):
for (_platform, signature), group_comps in sorted(signature_groups.items()):
if signature.startswith(ISOLATED_SIGNATURE_PREFIX):
isolated_groups.append((signature, group_comps))
else:

View File

@@ -890,7 +890,7 @@ def run_grouped_component_tests(
print("=" * 80 + "\n")
# Execute grouped tests
for (platform, signature), components in grouped_components.items():
for (platform, _signature), components in grouped_components.items():
# Only group if we have multiple components with same signature
if len(components) <= 1:
continue
@@ -1055,7 +1055,7 @@ def test_components(
# Create empty test files for each platform (or filtered platform)
reference_tests: list[Path] = []
for platform_name, base_file in platform_bases.items():
for platform_name in platform_bases:
if platform_filter and not platform_name.startswith(platform_filter):
continue
# Create an empty test file named to match the platform

View File

@@ -2,6 +2,8 @@
from unittest.mock import patch
import pytest
from esphome.components.display import (
DisplayMetaData,
add_metadata,
@@ -74,8 +76,5 @@ def test_add_metadata_overwrites_existing():
def test_metadata_is_frozen():
"""Test that DisplayMetaData instances are immutable (frozen dataclass)."""
meta = DisplayMetaData(320, 240, True, False)
try:
with pytest.raises(AttributeError):
meta.width = 640
assert False, "Expected FrozenInstanceError"
except AttributeError:
pass

View File

@@ -510,15 +510,9 @@ def test_package_merge_by_missing_id() -> None:
],
}
error_raised = False
try:
with pytest.raises(cv.Invalid) as exc_info:
packages_pass(config)
assert False, "Expected validation error for missing ID"
except cv.Invalid as err:
error_raised = True
assert err.path == [CONF_SENSOR, 2]
assert error_raised
assert exc_info.value.path == [CONF_SENSOR, 2]
def test_package_list_remove_by_id() -> None:

View File

@@ -407,8 +407,10 @@ async def wait_and_connect_api_client(
# Wait for connection with timeout
try:
await asyncio.wait_for(connected_future, timeout=timeout)
except TimeoutError:
raise TimeoutError(f"Failed to connect to API after {timeout} seconds")
except TimeoutError as err:
raise TimeoutError(
f"Failed to connect to API after {timeout} seconds"
) from err
if return_disconnect_event:
yield client, disconnect_event

View File

@@ -67,7 +67,7 @@ def test_iter_component_configs_with_multi_conf(mock_get_component: Mock) -> Non
configs = list(config.iter_component_configs(test_config))
assert len(configs) == 2
for domain, component, conf in configs:
for domain, _component, conf in configs:
assert domain == "switch"
assert "name" in conf