diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 790cc33fa213d..1e708cf983987 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1600,4 +1600,12 @@ repos:
types: [python]
files: ^providers/.*/src/airflow/providers/.*\.py$
require_serial: true
+ - id: verify-signatures
+ name: Verify decorators of missing operator params
+ language: python
+ entry: ./scripts/ci/pre_commit/verify_signature_consistency.py
+ pass_filenames: false
+ additional_dependencies: ['rich>=12.4.4']
+ always_run: true
+ files: ^(providers/.*/)?airflow/.*/(sensors|operators)/.*\.py$
## ONLY ADD PRE-COMMITS HERE THAT REQUIRE CI IMAGE
diff --git a/contributing-docs/08_static_code_checks.rst b/contributing-docs/08_static_code_checks.rst
index 70569c1a8f393..ed7820d0eab63 100644
--- a/contributing-docs/08_static_code_checks.rst
+++ b/contributing-docs/08_static_code_checks.rst
@@ -419,6 +419,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------+---------+
| validate-operators-init | No templated field logic checks in operator __init__ | |
+-----------------------------------------------------------+--------------------------------------------------------+---------+
+| verify-signatures | Verify decorators of missing operator params | * |
++-----------------------------------------------------------+--------------------------------------------------------+---------+
| yamllint | Check YAML files with yamllint | |
+-----------------------------------------------------------+--------------------------------------------------------+---------+
| zizmor | Run zizmor to check for github workflow syntax errors | |
diff --git a/dev/breeze/doc/images/output-commands.svg b/dev/breeze/doc/images/output-commands.svg
index 9b5bd8dd912c6..2eedafd11184b 100644
--- a/dev/breeze/doc/images/output-commands.svg
+++ b/dev/breeze/doc/images/output-commands.svg
@@ -335,8 +335,8 @@
│kafka | kerberos | keycloak | mongo | mssql | │
│openlineage | otel | pinot | qdrant | redis | redis | │
│statsd | tinkerpop | trino | ydb) │
-│--standalone-dag-processor/--no-standalone-dag-processoRun standalone dag processor for start-airflow │
-│r(required for Airflow 3). │
+│--standalone-dag-processor/--no-standalone-dag-process…Run standalone dag processor for start-airflow │
+│(required for Airflow 3). │
│[default: standalone-dag-processor] │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Docker Compose selection and cleanup ───────────────────────────────────────────────────────────────────────────────╮
diff --git a/dev/breeze/doc/images/output_setup.svg b/dev/breeze/doc/images/output_setup.svg
index c747a1eea7f38..5dda408adefbc 100644
--- a/dev/breeze/doc/images/output_setup.svg
+++ b/dev/breeze/doc/images/output_setup.svg
@@ -110,7 +110,7 @@
Tools that developers can use to configure Breeze
╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮
-│--help-hShow this message and exit.│
+│--help-hShow this message and exit.│
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Setup ──────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│autocomplete Enables autocompletion of breeze commands. │
diff --git a/dev/breeze/doc/images/output_setup.txt b/dev/breeze/doc/images/output_setup.txt
index 21f3c7e2527e0..274751197daaf 100644
--- a/dev/breeze/doc/images/output_setup.txt
+++ b/dev/breeze/doc/images/output_setup.txt
@@ -1 +1 @@
-ba00ab3fb2ed5a777684878c28b3ce65
+08c78d9dddd037a2ade6b751c5a22ff9
diff --git a/dev/breeze/doc/images/output_setup_regenerate-command-images.txt b/dev/breeze/doc/images/output_setup_regenerate-command-images.txt
index 14f64f8fcb257..83d01f096b85a 100644
--- a/dev/breeze/doc/images/output_setup_regenerate-command-images.txt
+++ b/dev/breeze/doc/images/output_setup_regenerate-command-images.txt
@@ -1 +1 @@
-b6bec540dff082eb353c54319297ed94
+a737e824010a74c63551f66433e0e7af
diff --git a/dev/breeze/doc/images/output_static-checks.svg b/dev/breeze/doc/images/output_static-checks.svg
index 0857d08d9d606..fbd6db6032b03 100644
--- a/dev/breeze/doc/images/output_static-checks.svg
+++ b/dev/breeze/doc/images/output_static-checks.svg
@@ -389,7 +389,7 @@
│update-pyproject-toml | update-reproducible-source-date-epoch | │
│update-spelling-wordlist-to-be-sorted | update-supported-versions | │
│update-vendored-in-k8s-json-schema | update-version | validate-chart-annotations │
-│| validate-operators-init | yamllint | zizmor) │
+│| validate-operators-init | verify-signatures | yamllint | zizmor) │
│--show-diff-on-failure-sShow diff for files modified by the checks.│
│--initialize-environmentInitialize environment before running checks.│
│--max-initialization-attemptsMaximum number of attempts to initialize environment before giving up.│
diff --git a/dev/breeze/doc/images/output_static-checks.txt b/dev/breeze/doc/images/output_static-checks.txt
index c4ea9001bac34..16d3baeccecd4 100644
--- a/dev/breeze/doc/images/output_static-checks.txt
+++ b/dev/breeze/doc/images/output_static-checks.txt
@@ -1 +1 @@
-c3e2295a3b4cfc4073a1c8bea4ee4f3b
+bdafa62779f00673ec0cd960a8a43e1e
diff --git a/dev/breeze/src/airflow_breeze/pre_commit_ids.py b/dev/breeze/src/airflow_breeze/pre_commit_ids.py
index a9d1b62c0b8ac..5983e6a65cf0f 100644
--- a/dev/breeze/src/airflow_breeze/pre_commit_ids.py
+++ b/dev/breeze/src/airflow_breeze/pre_commit_ids.py
@@ -162,6 +162,7 @@
"update-version",
"validate-chart-annotations",
"validate-operators-init",
+ "verify-signatures",
"yamllint",
"zizmor",
]
diff --git a/scripts/ci/pre_commit/verify_signature_consistency.py b/scripts/ci/pre_commit/verify_signature_consistency.py
new file mode 100755
index 0000000000000..f3d102d21876a
--- /dev/null
+++ b/scripts/ci/pre_commit/verify_signature_consistency.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).parent.resolve()))
+from common_precommit_utils import (
+ initialize_breeze_precommit,
+ run_command_via_breeze_shell,
+ validate_cmd_result,
+)
+
+initialize_breeze_precommit(__name__, __file__)
+
+cmd_result = run_command_via_breeze_shell(
+ ["python3", "/opt/airflow/scripts/in_container/run_signature_consistency_verify.py"],
+ backend="sqlite",
+ skip_environment_initialization=False,
+)
+
+validate_cmd_result(cmd_result)
diff --git a/scripts/in_container/run_signature_consistency_verify.py b/scripts/in_container/run_signature_consistency_verify.py
new file mode 100644
index 0000000000000..5b011b0a11d3c
--- /dev/null
+++ b/scripts/in_container/run_signature_consistency_verify.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import inspect
+import sys
+
+import libcst as cst
+from in_container_utils import AIRFLOW_ROOT_PATH, console
+
+DECORATOR_OPERATOR_MAP = {
+ "kubernetes": "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator",
+ "sensor": "airflow.sdk.bases.sensor.BaseSensorOperator",
+ "virtualenv": "airflow.providers.standard.operators.python.PythonVirtualenvOperator",
+ "branch_virtualenv": "airflow.providers.standard.operators.python.PythonVirtualenvOperator",
+ "bash": "airflow.providers.standard.operators.bash.BashOperator",
+ "short_circuit": "airflow.providers.standard.operators.python.ShortCircuitOperator",
+ "python": "airflow.providers.standard.operators.python.PythonOperator",
+ "external_python": "airflow.providers.standard.operators.python.ExternalPythonOperator",
+ # Add more here...
+}
+DECORATOR_PYI_PATH = (
+ AIRFLOW_ROOT_PATH / "task-sdk" / "src" / "airflow" / "sdk" / "definitions" / "decorators" / "__init__.pyi"
+)
+decorator_pyi_file_content = DECORATOR_PYI_PATH.read_text()
+STOP_CLASSES = {"airflow.models.baseoperator.BaseOperator", "airflow.sdk.bases.operator.BaseOperator"}
+
+
+def extract_function_params(code, function_name, return_type):
+ """Extracts parameters from a specific function definition in the given code.
+
+ Args:
+ code (str): The Python code to parse.
+ function_name (str): The name of the function to extract parameters from.
+ return_type (str): As the pyi file has multiple @overload decorator, extract function param based on return type.
+
+ Returns:
+ list: A list of parameter names, or None if the function is not found.
+ """
+ module = cst.parse_module(code)
+
+ class FunctionParamExtractor(cst.CSTVisitor):
+ def __init__(self, target_function_name, target_return_type):
+ self.target_function_name = target_function_name
+ self.target_return_type = target_return_type
+ self.params: list[str] = []
+
+ def visit_FunctionDef(self, node):
+ # Match function name
+ if node.name.value == self.target_function_name:
+ if node.returns:
+ annotation = node.returns.annotation
+ if isinstance(annotation, cst.Name) and annotation.value == self.target_return_type:
+ parameters_node = node.params
+ self.params.extend(param.name.value for param in parameters_node.params)
+ self.params.extend(param.name.value for param in parameters_node.kwonly_params)
+ self.params.extend(param.name.value for param in parameters_node.posonly_params)
+ if parameters_node.star_kwarg:
+ self.params.append(parameters_node.star_kwarg.name.value)
+ return False # Stop traversing after finding the real function
+ return True # Keep traversing
+
+ extractor = FunctionParamExtractor(function_name, return_type)
+ module.visit(extractor)
+ return extractor.params
+
+
+def get_decorator_params(decorator_name: str):
+ params = extract_function_params(decorator_pyi_file_content, decorator_name, "TaskDecorator")
+ return set(params)
+
+
+def get_operator_params(operator_path: str) -> set[str]:
+ module_path, class_name = operator_path.rsplit(".", 1)
+ module = __import__(module_path, fromlist=[class_name])
+ operator_cls = getattr(module, class_name)
+ console.print(f"operator_cls: {operator_cls}")
+ all_params: set[str] = set()
+ for cls in inspect.getmro(operator_cls):
+ full_class_path = f"{cls.__module__}.{cls.__qualname__}"
+ if full_class_path in STOP_CLASSES:
+ break
+ if cls is object:
+ continue # Skip base object
+ try:
+ sig = inspect.signature(cls)
+ except (ValueError, TypeError):
+ console.print(f"[red]Could not inspect: {cls}[/]")
+ continue
+ all_params.update(p for p in sig.parameters.keys() if p not in ("self", "args", "kwargs"))
+ console.print("[green]Extracted params:[/] ", all_params)
+ return all_params
+
+
+def verify_signature_consistency():
+ failure = False
+ console.print("Verify signature consistency")
+ for decorator, operator_path in DECORATOR_OPERATOR_MAP.items():
+ decorator_params = get_decorator_params(decorator)
+ operator_params = get_operator_params(operator_path)
+ missing_in_decorator = operator_params - decorator_params
+
+ ignored = {"kwargs", "args", "self", "python_callable", "op_args", "op_kwargs"}
+ missing_in_decorator -= ignored
+ if missing_in_decorator:
+ failure = True
+ console.print(
+ f"[yellow]Missing params in[/] [bold]__init__.py[/] for {decorator}: {missing_in_decorator}"
+ )
+ if failure:
+ console.print("[red]Some of the decorator signatures are missing in __init__.py[/]")
+ sys.exit(1)
+ console.print("[green]All decorator signature matches[/]")
+ sys.exit(0)
+
+
+if __name__ == "__main__":
+ verify_signature_consistency()
diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
index c9a6d9956bb64..41618c9ba2154 100644
--- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
@@ -22,11 +22,12 @@ from __future__ import annotations
from collections.abc import Callable, Collection, Container, Iterable, Mapping
from datetime import timedelta
-from typing import Any, TypeVar, overload
+from typing import TYPE_CHECKING, Any, TypeVar, overload
from docker.types import Mount
from kubernetes.client import models as k8s
+from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.sdk.bases.decorator import FParams, FReturn, Task, TaskDecorator, _TaskDecorator
from airflow.sdk.definitions.dag import dag
@@ -34,6 +35,8 @@ from airflow.sdk.definitions.decorators.condition import AnyConditionFunc
from airflow.sdk.definitions.decorators.task_group import task_group
from airflow.typing_compat import Literal
+if TYPE_CHECKING:
+ from airflow.utils.types import ArgNotSet
# Please keep this in sync with __init__.py's __all__.
__all__ = [
"TaskDecorator",
@@ -56,6 +59,7 @@ class TaskDecoratorCollection:
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonDecoratedOperator.
templates_dict: Mapping[str, Any] | None = None,
+ templates_exts: list[str] | None = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
@@ -67,6 +71,8 @@ class TaskDecoratorCollection:
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied.
+ :param templates_exts: a list of file extensions to resolve while
+ processing templated fields, for examples ``['.sql', '.hql']``
:param show_return_value_in_logs: a bool value whether to show return_value
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
@@ -101,11 +107,15 @@ class TaskDecoratorCollection:
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
+ templates_exts: list[str] | None = None,
pip_install_options: list[str] | None = None,
+ expect_airflow: bool = True,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
+ index_urls_from_connection_ids: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
+ string_args: Iterable[str] | None = None,
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
**kwargs,
@@ -135,6 +145,8 @@ class TaskDecoratorCollection:
exit code will be treated as a failure.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
+ :param index_urls_from_connection_ids: An optional list of ``PackageIndex`` connection IDs.
+ Will be appended to ``index_urls``.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be
replaced with a checksum of requirements. If not provided the virtual environment will be
@@ -143,6 +155,8 @@ class TaskDecoratorCollection:
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied.
+ :param templates_exts: a list of file extensions to resolve while
+ processing templated fields, for examples ``['.sql', '.hql']``
:param show_return_value_in_logs: a bool value whether to show return_value
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
@@ -163,11 +177,16 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
+ string_args: Iterable[str] | None = None,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
+ templates_exts: list[str] | None = None,
show_return_value_in_logs: bool = True,
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
+ expect_airflow: bool = True,
+ expect_pendulum: bool = False,
+ skip_on_exit_code: int | Container[int] | None = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
@@ -188,16 +207,26 @@ class TaskDecoratorCollection:
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied.
+ :param templates_exts: a list of file extensions to resolve while
+ processing templated fields, for examples ``['.sql', '.hql']``
:param show_return_value_in_logs: a bool value whether to show return_value
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
+ :param string_args: Strings that are present in the global var virtualenv_string_args,
+ available to python_callable at runtime as a list[str]. Note that args are split
+ by newline.
:param env_vars: A dictionary containing additional environment variables to set for the virtual
environment when it is executed.
:param inherit_env: Whether to inherit the current environment variables when executing the virtual
environment. If set to ``True``, the virtual environment will inherit the environment variables
of the parent process (``os.environ``). If set to ``False``, the virtual environment will be
executed with a clean environment.
+ :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator will raise warning if Airflow is not installed, and it will attempt to load Airflow macros when starting.
+ :param expect_pendulum: If set true, it checks if pendulum is properly installed in virtual environment.
+ :param skip_on_exit_code: If python_callable exits with this exit code, leave the task
+ in ``skipped`` state (default: None). If set to ``None``, any non-zero
+ exit code will be treated as a failure.
"""
@overload
def branch( # type: ignore[misc]
@@ -221,15 +250,21 @@ class TaskDecoratorCollection:
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
+ string_args: Iterable[str] | None = None,
python_version: None | str | int | float = None,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
+ templates_exts: list[str] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
+ index_urls_from_connection_ids: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
+ expect_airflow: bool = True,
show_return_value_in_logs: bool = True,
+ env_vars: dict[str, str] | None = None,
+ inherit_env: bool = True,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
@@ -253,13 +288,24 @@ class TaskDecoratorCollection:
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
+ :param templates_dict: a dictionary where the values are templates that
+ will get templated by the Airflow engine sometime between
+ ``__init__`` and ``execute`` takes place and are made available
+ in your callable's context after the template has been applied.
+ :param templates_exts: a list of file extensions to resolve while
+ processing templated fields, for examples ``['.sql', '.hql']``
:param pip_install_options: a list of pip install options when installing requirements
See 'pip install -h' for available options
:param skip_on_exit_code: If python_callable exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
+ :param string_args: Strings that are present in the global var virtualenv_string_args,
+ available to python_callable at runtime as a list[str]. Note that args are split
+ by newline.
:param index_urls: an optional list of index urls to load Python packages from.
If not provided the system pip conf will be used to source packages from.
+ :param index_urls_from_connection_ids: An optional list of ``PackageIndex`` connection IDs.
+ Will be appended to ``index_urls``.
:param venv_cache_path: Optional path to the virtual environment parent folder in which the
virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
with a checksum of requirements. If not provided the virtual environment will be created and
@@ -320,6 +366,9 @@ class TaskDecoratorCollection:
*,
multiple_outputs: bool | None = None,
ignore_downstream_trigger_rules: bool = True,
+ show_return_value_in_logs: bool = True,
+ templates_dict: Mapping[str, Any] | None = None,
+ templates_exts: list[str] | None = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a ShortCircuitOperator.
@@ -330,6 +379,16 @@ class TaskDecoratorCollection:
will be skipped. This is the default behavior. If set to False, the direct, downstream task(s)
will be skipped but the ``trigger_rule`` defined for a other downstream tasks will be respected.
Defaults to True.
+ :param show_return_value_in_logs: a bool value whether to show return_value
+ logs. Defaults to True, which allows return value log output.
+ It can be set to False to prevent log output of return value when you return huge data
+ such as transmission a large amount of XCom to TaskAPI.
+ :param templates_dict: a dictionary where the values are templates that
+ will get templated by the Airflow engine sometime between
+ ``__init__`` and ``execute`` takes place and are made available
+ in your callable's context after the template has been applied.
+ :param templates_exts: a list of file extensions to resolve while
+ processing templated fields, for examples ``['.sql', '.hql']``
"""
@overload
def short_circuit(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@@ -507,6 +566,7 @@ class TaskDecoratorCollection:
image: str | None = None,
name: str | None = None,
random_name_suffix: bool = ...,
+ cmds: list[str] | None = None,
arguments: list[str] | None = None,
ports: list[k8s.V1ContainerPort] | None = None,
volume_mounts: list[k8s.V1VolumeMount] | None = None,
@@ -520,6 +580,7 @@ class TaskDecoratorCollection:
reattach_on_restart: bool = ...,
startup_timeout_seconds: int = ...,
startup_check_interval_seconds: int = ...,
+ schedule_timeout_seconds: int | None = None,
get_logs: bool = True,
container_logs: Iterable[str] | str | Literal[True] = ...,
image_pull_policy: str | None = None,
@@ -530,6 +591,7 @@ class TaskDecoratorCollection:
node_selector: dict | None = None,
image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None,
service_account_name: str | None = None,
+ automount_service_account_token: bool | None = None,
hostnetwork: bool = False,
host_aliases: list[k8s.V1HostAlias] | None = None,
tolerations: list[k8s.V1Toleration] | None = None,
@@ -553,13 +615,20 @@ class TaskDecoratorCollection:
skip_on_exit_code: int | Container[int] | None = None,
base_container_name: str | None = None,
base_container_status_polling_interval: float = ...,
+ init_container_logs: Iterable[str] | str | Literal[True] | None = None,
deferrable: bool = ...,
poll_interval: float = ...,
log_pod_spec_on_failure: bool = ...,
on_finish_action: str = ...,
+ is_delete_operator_pod: None | bool = None,
termination_message_policy: str = ...,
active_deadline_seconds: int | None = None,
+ callbacks: (
+ list[type[KubernetesPodOperatorCallback]] | type[KubernetesPodOperatorCallback] | None
+ ) = None,
progress_callback: Callable[[str], None] | None = None,
+ logging_interval: int | None = None,
+ trigger_kwargs: dict | None = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert a callable to a Kubernetes Pod task.
@@ -666,6 +735,7 @@ class TaskDecoratorCollection:
:param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds
in V1PodSpec.
:param progress_callback: Callback function for receiving k8s container logs.
+ :param trigger_kwargs: additional keyword parameters passed to the trigger
"""
@overload
def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@@ -849,6 +919,8 @@ class TaskDecoratorCollection:
mode: str = ...,
exponential_backoff: bool = False,
max_wait: timedelta | float | None = None,
+ silent_fail: bool = False,
+ never_fail: bool = False,
**kwargs,
) -> TaskDecorator:
"""
@@ -873,6 +945,13 @@ class TaskDecoratorCollection:
:param exponential_backoff: allow progressive longer waits between
pokes by using exponential backoff algorithm
:param max_wait: maximum wait interval between pokes, can be ``timedelta`` or ``float`` seconds
+ :param silent_fail: If true, and poke method raises an exception different from
+ AirflowSensorTimeout, AirflowTaskTimeout, AirflowSkipException
+ and AirflowFailException, the sensor will log the error and continue
+ its execution. Otherwise, the sensor task fails, and it can be retried
+ based on the provided `retries` parameter.
+ :param never_fail: If true, and poke method raises an exception, sensor will be skipped.
+ Mutually exclusive with soft_fail.
"""
@overload
def sensor(self, python_callable: Callable[FParams, FReturn] | None = None) -> Task[FParams, FReturn]: ...
@@ -902,11 +981,13 @@ class TaskDecoratorCollection:
def bash( # type: ignore[misc]
self,
*,
+ bash_command: str | ArgNotSet,
env: dict[str, str] | None = None,
append_env: bool = False,
output_encoding: str = "utf-8",
skip_on_exit_code: int = 99,
cwd: str | None = None,
+ output_processor: Callable[[str], Any] = lambda result: result,
**kwargs,
) -> TaskDecorator:
"""Decorator to wrap a callable into a BashOperator task.
@@ -925,6 +1006,8 @@ class TaskDecoratorCollection:
(default: 99). If set to ``None``, any non-zero exit code will be treated as a failure.
:param cwd: Working directory to execute the command in. If None (default), the command is run in a
temporary directory.
+ :param output_processor: Function to further process the output of the bash script
+ (default is lambda output: output).
"""
@overload
def bash(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...