Skip to content

Cleanup mypy ignore in airflow-core where possible #53289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/api_fastapi/auth/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _conf_factory(section, key, **kwargs):
def factory() -> str:
from airflow.configuration import conf

return conf.get(section, key, **kwargs, suppress_warnings=True) # type: ignore[return-value]
return conf.get(section, key, **kwargs, suppress_warnings=True)

return factory

Expand Down Expand Up @@ -538,7 +538,7 @@ def get_signing_key(section: str, key: str, make_secret_key_if_needed: bool = Tr
raise ValueError(f"The value {section}/{key} must be set!")

# Mypy can't grock the `if not secret_key`
return secret_key # type: ignore[return-value]
return secret_key


def get_signing_args(make_secret_key_if_needed: bool = True) -> dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DagVersionResponse(BaseModel):
dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name"))

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@computed_field # type: ignore[prop-decorator]
@property
def bundle_url(self) -> str | None:
if self.bundle_name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_timetable_summary(cls, tts: str | None) -> str | None:
return str(tts)

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@computed_field # type: ignore[prop-decorator]
@property
def file_token(self) -> str:
"""Return file token."""
Expand Down Expand Up @@ -185,14 +185,14 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
return {k: v.dump() for k, v in params.items()}

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@computed_field # type: ignore[prop-decorator]
@property
def concurrency(self) -> int:
"""Return max_active_tasks as concurrency."""
return self.max_active_tasks

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@computed_field # type: ignore[prop-decorator]
@property
def latest_dag_version(self) -> DagVersionResponse | None:
"""Return the latest DagVersion."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
return {param_name: param_val.dump() for param_name, param_val in params.items()}

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@computed_field # type: ignore[prop-decorator]
@property
def extra_links(self) -> list[str]:
"""Extract and return extra_links."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ def handle_request(self) -> BulkResponse:
results[action.action.value] = BulkActionResponse()

if action.action == BulkAction.CREATE:
self.handle_bulk_create(action, results[action.action.value]) # type: ignore
self.handle_bulk_create(action, results[action.action.value])
elif action.action == BulkAction.UPDATE:
self.handle_bulk_update(action, results[action.action.value]) # type: ignore
self.handle_bulk_update(action, results[action.action.value])
elif action.action == BulkAction.DELETE:
self.handle_bulk_delete(action, results[action.action.value]) # type: ignore
self.handle_bulk_delete(action, results[action.action.value])

return BulkResponse(**results)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class CadwynWithOpenAPICustomization(Cadwyn):
# Workaround lack of customzation https://github.com/zmievsa/cadwyn/issues/255
async def openapi_jsons(self, req: Request) -> JSONResponse:
resp = await super().openapi_jsons(req)
open_apischema = json.loads(resp.body) # type: ignore[arg-type]
open_apischema = json.loads(resp.body)
open_apischema = self.customize_openapi(open_apischema)

resp.body = resp.render(open_apischema)
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,10 +961,10 @@ def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str
@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...

@overload # type: ignore[override]
@overload
def get(self, section: str, key: str, **kwargs) -> str | None: ...

def get( # type: ignore[override,misc]
def get( # type: ignore[misc]
self,
section: str,
key: str,
Expand Down Expand Up @@ -2102,7 +2102,7 @@ def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigPars
)
else:
# there
AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") # type: ignore[assignment]
AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")
warnings.warn(msg, category=DeprecationWarning, stacklevel=1)


Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ def client(self) -> Client:

client = Client(base_url=None, token="", dry_run=True, transport=self._api_server.transport)
# Mypy is wrong -- the setter accepts a string on the property setter! `URLType = URL | str`
client.base_url = "http://in-process.invalid./" # type: ignore[assignment]
client.base_url = "http://in-process.invalid./"
return client

def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess:
Expand Down Expand Up @@ -1113,7 +1113,7 @@ def reload_configuration_for_dag_processing():
# iterating on https://github.com/apache/airflow/pull/19860
# The issue that describes the problem and possible remediation is
# at https://github.com/apache/airflow/issues/19934
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0])) # type: ignore
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0]))
importlib.reload(airflow.settings)
airflow.settings.initialize()
del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: Fil

callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
# TODO:We need a proper context object!
context: Context = { # type: ignore[assignment]
context: Context = {
"dag": dag,
"run_id": request.run_id,
"reason": request.msg,
Expand Down Expand Up @@ -357,7 +357,7 @@ def _on_child_started(
)
self.send_msg(msg, request_id=0)

def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int) -> None: # type: ignore[override]
def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int) -> None:
from airflow.sdk.api.datamodels._generated import ConnectionResponse, VariableResponse

resp: BaseModel | None = None
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def start(self) -> None:

# Mypy sees this value as `SynchronizedBase[c_uint]`, but that isn't the right runtime type behaviour
# (it looks like an int to python)
self._unread_messages = multiprocessing.Value(ctypes.c_uint) # type: ignore[assignment]
self._unread_messages = multiprocessing.Value(ctypes.c_uint)

def _check_workers(self):
# Reap any dead workers
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ def _execute(self) -> int | None:

self._run_scheduler_loop()

settings.Session.remove() # type: ignore
settings.Session.remove()
except Exception:
self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
raise
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ def client(self) -> Client:

client = Client(base_url=None, token="", dry_run=True, transport=in_process_api_server().transport)
# Mypy is wrong -- the setter accepts a string on the property setter! `URLType = URL | str`
client.base_url = "http://in-process.invalid./" # type: ignore[assignment]
client.base_url = "http://in-process.invalid./"
return client

def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, req_id: int) -> None: # type: ignore[override]
def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, req_id: int) -> None:
from airflow.sdk.api.datamodels._generated import (
ConnectionResponse,
TaskStatesResponse,
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ def bulk_write_to_db(
log.info("Sync %s DAGs", len(dags))
dag_op = DagModelOperation(
bundle_name=bundle_name, bundle_version=bundle_version, dags={d.dag_id: d for d in dags}
) # type: ignore[misc]
)

orm_dags = dag_op.add_dags(session=session)
dag_op.update_dags(orm_dags, session=session)
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ def notify_dagrun_state_changed(self, msg: str = ""):

def handle_dag_callback(self, dag: SDKDAG, success: bool = True, reason: str = "success"):
"""Only needed for `dag.test` where `execute_callbacks=True` is passed to `update_state`."""
context: Context = { # type: ignore[assignment]
context: Context = {
"dag": dag,
"run_id": str(self.run_id),
"reason": reason,
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
getstate_setstate=False,
repr=False,
)
class MappedOperator(TaskSDKMappedOperator): # type: ignore[misc] # It complains about weight_rule being different
class MappedOperator(TaskSDKMappedOperator):
"""Object representing a mapped operator in a DAG."""

deps: frozenset[BaseTIDep] = attrs.field(init=False, default=DEFAULT_OPERATOR_DEPS)
Expand Down Expand Up @@ -145,7 +145,7 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
link = self.operator_extra_link_dict.get(name) or self.global_operator_extra_link_dict.get(name)
if not link:
return None
return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type]
return link.get_link(self, ti_key=ti.key)


@functools.singledispatch
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def slots_stats(
pool_rows = session.execute(query)
for pool_name, total_slots, include_deferred in pool_rows:
if total_slots == -1:
total_slots = float("inf") # type: ignore
total_slots = float("inf")
pools[pool_name] = PoolStats(
total=total_slots, running=0, queued=0, open=0, deferred=0, scheduled=0
)
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,13 @@ class SerializedDagModel(Base):

dag_runs = relationship(
DagRun,
primaryjoin=dag_id == foreign(DagRun.dag_id), # type: ignore
primaryjoin=dag_id == foreign(DagRun.dag_id),
backref=backref("serialized_dag", uselist=False, innerjoin=True),
)

dag_model = relationship(
DagModel,
primaryjoin=dag_id == DagModel.dag_id, # type: ignore
primaryjoin=dag_id == DagModel.dag_id, # type: ignore[has-type]
foreign_keys=dag_id,
uselist=False,
innerjoin=True,
Expand Down
10 changes: 5 additions & 5 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def from_runtime_ti(cls, runtime_ti: RuntimeTaskInstanceProtocol) -> TaskInstanc
runtime_ti.map_index = -1
ti = TaskInstance(
run_id=runtime_ti.run_id,
task=runtime_ti.task, # type: ignore[arg-type]
task=runtime_ti.task,
map_index=runtime_ti.map_index,
dag_version_id=runtime_ti.dag_version_id,
)
Expand Down Expand Up @@ -846,7 +846,7 @@ def refresh_from_task(
self.pool_slots = task.pool_slots
with contextlib.suppress(Exception):
# This method is called from the different places, and sometimes the TI is not fully initialized
self.priority_weight = self.task.weight_rule.get_weight(self) # type: ignore[arg-type]
self.priority_weight = self.task.weight_rule.get_weight(self)
self.run_as_user = task.run_as_user
# Do not set max_tries to task.retries here because max_tries is a cumulative
# value that needs to be stored in the db.
Expand Down Expand Up @@ -1264,7 +1264,7 @@ def _check_and_change_state_before_execution(

# Closing all pooled connections to prevent
# "max number of connections reached"
settings.engine.dispose() # type: ignore
settings.engine.dispose()
if verbose:
if mark_success:
cls.logger().info("Marking success for %s on %s", ti.task, ti.logical_date)
Expand Down Expand Up @@ -1777,7 +1777,7 @@ def handle_failure(
if test_mode is None:
test_mode = self.test_mode
failure_context = TaskInstance.fetch_handle_failure_context(
ti=self, # type: ignore[arg-type]
ti=self,
error=error,
test_mode=test_mode,
session=session,
Expand Down Expand Up @@ -1985,7 +1985,7 @@ def render_templates(
# able to access the unmapped task instead.
original_task.render_template_fields(context, jinja_env)
if isinstance(self.task, MappedOperator):
self.task = context["ti"].task # type: ignore[assignment]
self.task = context["ti"].task

return original_task

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def __init__(self):
self._asset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {}
self._asset_factories: dict[str, Callable[..., Asset]] = {}
self._asset_to_openlineage_converters: dict[str, Callable] = {}
self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # type: ignore[assignment]
self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache()
# keeps mapping between connection_types and hook class, package they come from
self._hook_provider_dict: dict[str, HookClassProvider] = {}
self._dialect_provider_dict: dict[str, DialectInfo] = {}
Expand Down Expand Up @@ -447,7 +447,7 @@ def _init_airflow_core_hooks(self):
connection_type=None,
package_name="apache-airflow-providers-standard",
hook_class_name=class_name,
provider_info=None, # type: ignore[argument]
provider_info=None,
)

@provider_info_cache("list")
Expand Down
10 changes: 5 additions & 5 deletions airflow-core/src/airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def encode_relativedelta(var: relativedelta.relativedelta) -> dict[str, Any]:
def decode_relativedelta(var: dict[str, Any]) -> relativedelta.relativedelta:
"""Dencode a relativedelta object."""
if "weekday" in var:
var["weekday"] = relativedelta.weekday(*var["weekday"]) # type: ignore
var["weekday"] = relativedelta.weekday(*var["weekday"])
return relativedelta.relativedelta(**var)


Expand Down Expand Up @@ -399,14 +399,14 @@ def encode_outlet_event_accessors(var: OutletEventAccessors) -> dict[str, Any]:
"__type": DAT.ASSET_EVENT_ACCESSORS,
"_dict": [
{"key": BaseSerialization.serialize(k), "value": encode_outlet_event_accessor(v)}
for k, v in var._dict.items() # type: ignore[attr-defined]
for k, v in var._dict.items()
],
}


def decode_outlet_event_accessors(var: dict[str, Any]) -> OutletEventAccessors:
d = OutletEventAccessors() # type: ignore[assignment]
d._dict = { # type: ignore[attr-defined]
d = OutletEventAccessors()
d._dict = {
BaseSerialization.deserialize(row["key"]): decode_outlet_event_accessor(row["value"])
for row in var["_dict"]
}
Expand Down Expand Up @@ -1524,7 +1524,7 @@ def populate_operator(

if v is False:
raise RuntimeError("_is_sensor=False should never have been serialized!")
object.__setattr__(op, "deps", op.deps | {ReadyToRescheduleDep()}) # type: ignore[union-attr]
object.__setattr__(op, "deps", op.deps | {ReadyToRescheduleDep()})
continue
elif (
k in cls._decorated_fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ def _get_tzinfo_name(tzinfo: datetime.tzinfo | None) -> str | None:
return tzinfo.name
if hasattr(tzinfo, "zone"):
# pytz timezone
return tzinfo.zone # type: ignore[no-any-return]
return tzinfo.zone

return None
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def replace_showwarning(replacement):
original_show_warning = replace_showwarning(custom_show_warning)
atexit.register(functools.partial(replace_showwarning, original_show_warning))

POLICY_PLUGIN_MANAGER: Any = None # type: ignore
POLICY_PLUGIN_MANAGER: Any = None


def task_policy(task):
Expand Down Expand Up @@ -417,7 +417,7 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None):
default_args = default.copy()
break

engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) # type: ignore
engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args)

if pool_class:
# Don't use separate settings for size etc, only those from sql_alchemy_engine_args
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/task/priority_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def deserialize(cls, data: dict[str, Any]) -> PriorityWeightStrategy:
was returned by ``serialize`` during DAG serialization. The default
implementation constructs the priority weight strategy without any arguments.
"""
return cls(**data) # type: ignore[call-arg]
return cls(**data)

def serialize(self) -> dict[str, Any]:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/deprecation_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def add_deprecated_classes(
override_deprecated_classes_for_module = {}

# Mypy is not able to derive the right function signature https://github.com/python/mypy/issues/2427
module_type.__getattr__ = functools.partial( # type: ignore[assignment]
module_type.__getattr__ = functools.partial( # type: ignore[method-assign]
getattr_with_deprecation,
imports,
full_module_name,
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
if sys.version_info >= (3, 12):
from importlib import metadata
else:
import importlib_metadata as metadata # type: ignore[no-redef]
import importlib_metadata as metadata

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/hashlib_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@
:param string: The data to hash. Default to empty str byte.
:return: The hashed value.
"""
return hashlib.md5(string, usedforsecurity=False) # type: ignore
return hashlib.md5(string, usedforsecurity=False)
Loading