Skip to content

Commit 5945451

Browse files
committed
recommit without precommit changes
1 parent c0f5328 commit 5945451

File tree

4 files changed

+197
-3
lines changed

4 files changed

+197
-3
lines changed

.pre-commit-config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,4 +1519,12 @@ repos:
15191519
additional_dependencies: ['rich>=12.4.4', 'ruff==0.11.13']
15201520
files: ^providers/.*/src/airflow/providers/.*version_compat.*\.py$
15211521
require_serial: true
1522+
- id: verify-signatures
1523+
name: Verify if decorator signatures added in __init__.pyi is not missing any valid params of the operator
1524+
language: python
1525+
entry: ./scripts/ci/pre_commit/verify_signature_consistency.py
1526+
pass_filenames: false
1527+
additional_dependencies: ['rich>=12.4.4']
1528+
always_run: true
1529+
files: ^(providers/.*/)?airflow/.*/(sensors|operators)/.*\.py$
15221530
## ONLY ADD PRE-COMMITS HERE THAT REQUIRE CI IMAGE
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env python
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
from __future__ import annotations
21+
22+
import sys
23+
from pathlib import Path
24+
25+
sys.path.insert(0, str(Path(__file__).parent.resolve()))
26+
from common_precommit_utils import (
27+
initialize_breeze_precommit,
28+
run_command_via_breeze_shell,
29+
validate_cmd_result,
30+
)
31+
32+
initialize_breeze_precommit(__name__, __file__)
33+
34+
cmd_result = run_command_via_breeze_shell(
35+
["python3", "/opt/airflow/scripts/in_container/run_signature_consistency_verify.py"],
36+
backend="sqlite",
37+
skip_environment_initialization=False,
38+
)
39+
40+
validate_cmd_result(cmd_result)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#!/usr/bin/env python
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
from __future__ import annotations
20+
21+
import inspect
22+
import sys
23+
24+
import libcst as cst
25+
from in_container_utils import AIRFLOW_ROOT_PATH, console
26+
27+
DECORATOR_OPERATOR_MAP = {
28+
"kubernetes": "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator",
29+
"sensor": "airflow.sdk.bases.sensor.BaseSensorOperator",
30+
"virtualenv": "airflow.providers.standard.operators.python.PythonVirtualenvOperator",
31+
"branch_virtualenv": "airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator",
32+
# Add more here...
33+
}
34+
DECORATOR_PYI_PATH = (
35+
AIRFLOW_ROOT_PATH / "task-sdk" / "src" / "airflow" / "sdk" / "definitions" / "decorators" / "__init__.pyi"
36+
)
37+
decorator_pyi_file_content = DECORATOR_PYI_PATH.read_text()
38+
39+
40+
def extract_function_params(code, function_name, return_type):
41+
"""Extracts parameters from a specific function definition in the given code.
42+
43+
Args:
44+
code (str): The Python code to parse.
45+
function_name (str): The name of the function to extract parameters from.
46+
return_type (str): As the pyi file has multiple @overload decorator, extract function param based on return type.
47+
48+
Returns:
49+
list: A list of parameter names, or None if the function is not found.
50+
"""
51+
module = cst.parse_module(code)
52+
53+
class FunctionParamExtractor(cst.CSTVisitor):
54+
def __init__(self, target_function_name, target_return_type):
55+
self.target_function_name = target_function_name
56+
self.target_return_type = target_return_type
57+
self.params: list[str] = []
58+
59+
def visit_FunctionDef(self, node):
60+
# Match function name
61+
if node.name.value == self.target_function_name:
62+
if node.returns:
63+
annotation = node.returns.annotation
64+
if isinstance(annotation, cst.Name) and annotation.value == self.target_return_type:
65+
parameters_node = node.params
66+
self.params.extend(param.name.value for param in parameters_node.params)
67+
self.params.extend(param.name.value for param in parameters_node.kwonly_params)
68+
self.params.extend(param.name.value for param in parameters_node.posonly_params)
69+
if parameters_node.star_kwarg:
70+
self.params.append(parameters_node.star_kwarg.name.value)
71+
return False # Stop traversing after finding the real function
72+
return True # Keep traversing
73+
74+
extractor = FunctionParamExtractor(function_name, return_type)
75+
module.visit(extractor)
76+
return extractor.params
77+
78+
79+
def get_decorator_params(decorator_name: str):
80+
params = extract_function_params(decorator_pyi_file_content, decorator_name, "TaskDecorator")
81+
return set(params)
82+
83+
84+
def get_operator_params(operator_path: str):
85+
console.print("Operator path:", operator_path)
86+
module_path, class_name = operator_path.rsplit(".", 1)
87+
module = __import__(module_path, fromlist=[class_name])
88+
operator_cls = getattr(module, class_name)
89+
sig = inspect.signature(operator_cls.__init__)
90+
return set(p for p in sig.parameters.keys() if p not in ("self", "args", "kwargs"))
91+
92+
93+
def verify_signature_consistency():
94+
failure = False
95+
console.print("Verify signature consistency")
96+
for decorator, operator_path in DECORATOR_OPERATOR_MAP.items():
97+
decorator_params = get_decorator_params(decorator)
98+
operator_params = get_operator_params(operator_path)
99+
missing_in_decorator = operator_params - decorator_params
100+
101+
ignored = {"kwargs", "args", "self"}
102+
missing_in_decorator -= ignored
103+
if missing_in_decorator:
104+
failure = True
105+
console.print("Following list of params are missing in __init__.py[/]:", missing_in_decorator)
106+
if failure:
107+
console.print("[red]Some of the decorator signatures are missing in __init__.py[/]")
108+
sys.exit(1)
109+
console.print("[green]All decorator signature matches[/]")
110+
sys.exit(0)
111+
112+
113+
if __name__ == "__main__":
114+
verify_signature_consistency()

task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ from typing import Any, Callable, TypeVar, overload
2727
from docker.types import Mount
2828
from kubernetes.client import models as k8s
2929

30+
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
3031
from airflow.providers.cncf.kubernetes.secret import Secret
3132
from airflow.sdk.bases.decorator import FParams, FReturn, Task, TaskDecorator, _TaskDecorator
3233
from airflow.sdk.definitions.dag import dag
@@ -96,16 +97,21 @@ class TaskDecoratorCollection:
9697
multiple_outputs: bool | None = None,
9798
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
9899
# _PythonVirtualenvDecoratedOperator.
100+
python_callable: Callable,
101+
op_args: Collection[Any] | None = None,
102+
op_kwargs: Mapping[str, Any] | None = None,
99103
requirements: None | Iterable[str] | str = None,
100104
python_version: None | str | int | float = None,
101105
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
102106
system_site_packages: bool = True,
103107
templates_dict: Mapping[str, Any] | None = None,
108+
templates_exts: list[str] | None = None,
104109
pip_install_options: list[str] | None = None,
110+
expect_airflow: bool = True,
105111
skip_on_exit_code: int | Container[int] | None = None,
106112
index_urls: None | Collection[str] | str = None,
107113
venv_cache_path: None | str = None,
108-
show_return_value_in_logs: bool = True,
114+
string_args: Iterable[str] | None = None,
109115
env_vars: dict[str, str] | None = None,
110116
inherit_env: bool = True,
111117
**kwargs,
@@ -218,18 +224,24 @@ class TaskDecoratorCollection:
218224
self,
219225
*,
220226
multiple_outputs: bool | None = None,
221-
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
222-
# _PythonVirtualenvDecoratedOperator.
227+
python_callable: Callable,
223228
requirements: None | Iterable[str] | str = None,
229+
op_args: Collection[Any] | None = None,
230+
op_kwargs: Mapping[str, Any] | None = None,
231+
string_args: Iterable[str] | None = None,
224232
python_version: None | str | int | float = None,
225233
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
226234
system_site_packages: bool = True,
227235
templates_dict: Mapping[str, Any] | None = None,
236+
templates_exts: list[str] | None = None,
228237
pip_install_options: list[str] | None = None,
229238
skip_on_exit_code: int | Container[int] | None = None,
230239
index_urls: None | Collection[str] | str = None,
231240
venv_cache_path: None | str = None,
241+
expect_airflow: bool = True,
232242
show_return_value_in_logs: bool = True,
243+
env_vars: dict[str, str] | None = None,
244+
inherit_env: bool = True,
233245
**kwargs,
234246
) -> TaskDecorator:
235247
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
@@ -507,6 +519,7 @@ class TaskDecoratorCollection:
507519
image: str | None = None,
508520
name: str | None = None,
509521
random_name_suffix: bool = ...,
522+
cmds: list[str] | None = None,
510523
arguments: list[str] | None = None,
511524
ports: list[k8s.V1ContainerPort] | None = None,
512525
volume_mounts: list[k8s.V1VolumeMount] | None = None,
@@ -520,6 +533,7 @@ class TaskDecoratorCollection:
520533
reattach_on_restart: bool = ...,
521534
startup_timeout_seconds: int = ...,
522535
startup_check_interval_seconds: int = ...,
536+
schedule_timeout_seconds: int | None = None,
523537
get_logs: bool = True,
524538
container_logs: Iterable[str] | str | Literal[True] = ...,
525539
image_pull_policy: str | None = None,
@@ -530,6 +544,7 @@ class TaskDecoratorCollection:
530544
node_selector: dict | None = None,
531545
image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None,
532546
service_account_name: str | None = None,
547+
automount_service_account_token: bool | None = None,
533548
hostnetwork: bool = False,
534549
host_aliases: list[k8s.V1HostAlias] | None = None,
535550
tolerations: list[k8s.V1Toleration] | None = None,
@@ -553,13 +568,20 @@ class TaskDecoratorCollection:
553568
skip_on_exit_code: int | Container[int] | None = None,
554569
base_container_name: str | None = None,
555570
base_container_status_polling_interval: float = ...,
571+
init_container_logs: Iterable[str] | str | Literal[True] | None = None,
556572
deferrable: bool = ...,
557573
poll_interval: float = ...,
558574
log_pod_spec_on_failure: bool = ...,
559575
on_finish_action: str = ...,
576+
is_delete_operator_pod: None | bool = None,
560577
termination_message_policy: str = ...,
561578
active_deadline_seconds: int | None = None,
579+
callbacks: (
580+
list[type[KubernetesPodOperatorCallback]] | type[KubernetesPodOperatorCallback] | None
581+
) = None,
562582
progress_callback: Callable[[str], None] | None = None,
583+
logging_interval: int | None = None,
584+
trigger_kwargs: dict | None = None,
563585
**kwargs,
564586
) -> TaskDecorator:
565587
"""Create a decorator to convert a callable to a Kubernetes Pod task.
@@ -666,6 +688,7 @@ class TaskDecoratorCollection:
666688
:param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds
667689
in V1PodSpec.
668690
:param progress_callback: Callback function for receiving k8s container logs.
691+
:param trigger_kwargs: additional keyword parameters passed to the trigger
669692
"""
670693
@overload
671694
def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@@ -849,6 +872,8 @@ class TaskDecoratorCollection:
849872
mode: str = ...,
850873
exponential_backoff: bool = False,
851874
max_wait: timedelta | float | None = None,
875+
silent_fail: bool = False,
876+
never_fail: bool = False,
852877
**kwargs,
853878
) -> TaskDecorator:
854879
"""
@@ -873,6 +898,13 @@ class TaskDecoratorCollection:
873898
:param exponential_backoff: allow progressive longer waits between
874899
pokes by using exponential backoff algorithm
875900
:param max_wait: maximum wait interval between pokes, can be ``timedelta`` or ``float`` seconds
901+
:param silent_fail: If true, and poke method raises an exception different from
902+
AirflowSensorTimeout, AirflowTaskTimeout, AirflowSkipException
903+
and AirflowFailException, the sensor will log the error and continue
904+
its execution. Otherwise, the sensor task fails, and it can be retried
905+
based on the provided `retries` parameter.
906+
:param never_fail: If true, and poke method raises an exception, sensor will be skipped.
907+
Mutually exclusive with soft_fail.
876908
"""
877909
@overload
878910
def sensor(self, python_callable: Callable[FParams, FReturn] | None = None) -> Task[FParams, FReturn]: ...

0 commit comments

Comments
 (0)