|
22 | 22 |
|
23 | 23 | from airflow.sdk.definitions.asset import Asset
|
24 | 24 | from airflow.sdk.definitions.asset.decorators import _AssetMainOperator, asset
|
| 25 | +from airflow.sdk.definitions.decorators import task |
25 | 26 | from airflow.sdk.execution_time.comms import AssetResult, GetAssetByName
|
26 | 27 |
|
27 | 28 |
|
@@ -159,6 +160,17 @@ def example_asset_func(self, /):
|
159 | 160 | == "positional-only argument 'self' without a default is not supported in @asset"
|
160 | 161 | )
|
161 | 162 |
|
| 163 | + def test_with_task_decorator(self, func_fixer): |
| 164 | + @task(retries=3) |
| 165 | + @func_fixer |
| 166 | + def _example_task_func(): |
| 167 | + return "This is example_task" |
| 168 | + |
| 169 | + asset_definition = asset(name="asset", dag_id="dag", schedule=None)(_example_task_func) |
| 170 | + assert asset_definition.name == "asset" |
| 171 | + assert asset_definition._source.dag_id == "dag" |
| 172 | + assert asset_definition._function == _example_task_func |
| 173 | + |
162 | 174 | @pytest.mark.parametrize(
|
163 | 175 | "provided_uri, expected_uri",
|
164 | 176 | [
|
@@ -222,6 +234,36 @@ def test__attrs_post_init__(self, DAG, from_definition, example_asset_func_with_
|
222 | 234 | )
|
223 | 235 | from_definition.assert_called_once_with(asset_definition)
|
224 | 236 |
|
| 237 | + @mock.patch("airflow.sdk.bases.decorator._TaskDecorator.__call__") |
| 238 | + @mock.patch("airflow.sdk.definitions.dag.DAG") |
| 239 | + def test_with_task_decorator(self, DAG, __call__, func_fixer): |
| 240 | + @task(retries=3) |
| 241 | + @func_fixer |
| 242 | + def _example_task_func(): |
| 243 | + return "This is example_task" |
| 244 | + |
| 245 | + asset_definition = asset(schedule=None, uri="s3://bucket/object", group="MLModel", extra={"k": "v"})( |
| 246 | + _example_task_func |
| 247 | + ) |
| 248 | + |
| 249 | + DAG.assert_called_once_with( |
| 250 | + dag_id="example_asset_func", |
| 251 | + dag_display_name="example_asset_func", |
| 252 | + description=None, |
| 253 | + schedule=None, |
| 254 | + catchup=False, |
| 255 | + is_paused_upon_creation=None, |
| 256 | + on_failure_callback=None, |
| 257 | + on_success_callback=None, |
| 258 | + params=None, |
| 259 | + access_control=None, |
| 260 | + owner_links={}, |
| 261 | + tags=set(), |
| 262 | + auto_register=True, |
| 263 | + ) |
| 264 | + __call__.assert_called_once_with() |
| 265 | + assert asset_definition._function.kwargs["outlets"] == [asset_definition] |
| 266 | + |
225 | 267 |
|
226 | 268 | class TestMultiAssetDefinition:
|
227 | 269 | @mock.patch("airflow.sdk.definitions.asset.decorators._AssetMainOperator.from_definition")
|
@@ -249,6 +291,37 @@ def test__attrs_post_init__(self, DAG, from_definition, example_asset_func_with_
|
249 | 291 | )
|
250 | 292 | from_definition.assert_called_once_with(definition)
|
251 | 293 |
|
| 294 | + @mock.patch("airflow.sdk.bases.decorator._TaskDecorator.__call__") |
| 295 | + @mock.patch("airflow.sdk.definitions.dag.DAG") |
| 296 | + def test_with_task_decorator(self, DAG, __call__, func_fixer): |
| 297 | + @task(retries=3) |
| 298 | + @func_fixer |
| 299 | + def _example_task_func(): |
| 300 | + return "This is example_task" |
| 301 | + |
| 302 | + definition = asset.multi( |
| 303 | + schedule=None, |
| 304 | + outlets=[Asset(name="a"), Asset(name="b")], |
| 305 | + )(_example_task_func) |
| 306 | + |
| 307 | + DAG.assert_called_once_with( |
| 308 | + dag_id="example_asset_func", |
| 309 | + dag_display_name="example_asset_func", |
| 310 | + description=None, |
| 311 | + schedule=None, |
| 312 | + catchup=False, |
| 313 | + is_paused_upon_creation=None, |
| 314 | + on_failure_callback=None, |
| 315 | + on_success_callback=None, |
| 316 | + params=None, |
| 317 | + access_control=None, |
| 318 | + owner_links={}, |
| 319 | + tags=set(), |
| 320 | + auto_register=True, |
| 321 | + ) |
| 322 | + __call__.assert_called_once_with() |
| 323 | + assert definition._function.kwargs["outlets"] == [Asset(name="a"), Asset(name="b")] |
| 324 | + |
252 | 325 |
|
253 | 326 | class Test_AssetMainOperator:
|
254 | 327 | def test_from_definition(self, example_asset_func_with_valid_arg_as_inlet_asset):
|
|
0 commit comments