Skip to content

Commit cd206bf

Browse files
uranusjrRoyLee1224
authored andcommitted
Store and expose task inlet references to assets (apache#51424)
1 parent a13699c commit cd206bf

File tree

27 files changed

+1856
-1515
lines changed

27 files changed

+1856
-1515
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
789370f7c29cc290c307562a13f06576d8651e12916cc42f89653f8127c00210
1+
7e97c71ee6da77d758087a5d7129ac8d63b52dcc3bf4c46b7faf5977faa53ef4

airflow-core/docs/img/airflow_erd.svg

Lines changed: 1504 additions & 1459 deletions
Loading

airflow-core/docs/migrations-ref.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
3939
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4040
| Revision ID | Revises ID | Airflow Version | Description |
4141
+=========================+==================+===================+==============================================================+
42-
| ``3ac9e5732b1f`` (head) | ``0242ac120002`` | ``3.1.0`` | Change the on-delete behaviour of |
42+
| ``583e80dfcef4`` (head) | ``3ac9e5732b1f`` | ``3.1.0`` | Add task_inlet_asset_reference table. |
43+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
44+
| ``3ac9e5732b1f`` | ``0242ac120002`` | ``3.1.0`` | Change the on-delete behaviour of |
4345
| | | | task_instance.dag_version_id foreign key constraint to |
4446
| | | | RESTRICT. |
4547
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
The ``consuming_dags`` key in asset API has been renamed to ``scheduled_dags``.
2+
3+
The previous name caused confusion to users since the list does not contain all
4+
dags that technically *use* the asset, but only those that use it in their
5+
``schedule`` argument. As a bug fix, the key has been renamed to clarify its
6+
intention.
7+
8+
* Types of change
9+
10+
* [ ] Dag changes
11+
* [ ] Config changes
12+
* [x] API changes
13+
* [ ] CLI changes
14+
* [ ] Behaviour changes
15+
* [ ] Plugin changes
16+
* [ ] Dependency changes
17+
* [ ] Code interface changes

airflow-core/src/airflow/api_fastapi/common/parameters.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
AssetAliasModel,
4747
AssetModel,
4848
DagScheduleAssetReference,
49+
TaskInletAssetReference,
4950
TaskOutletAssetReference,
5051
)
5152
from airflow.models.connection import Connection
@@ -431,7 +432,7 @@ class _DagIdAssetReferenceFilter(BaseParam[list[str]]):
431432
"""Search on dag_id."""
432433

433434
def __init__(self, skip_none: bool = True) -> None:
434-
super().__init__(AssetModel.consuming_dags, skip_none)
435+
super().__init__(AssetModel.scheduled_dags, skip_none)
435436

436437
@classmethod
437438
def depends(cls, dag_ids: list[str] = Query(None)) -> _DagIdAssetReferenceFilter:
@@ -444,8 +445,9 @@ def to_orm(self, select: Select) -> Select:
444445
if self.value is None and self.skip_none:
445446
return select
446447
return select.where(
447-
(AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(self.value)))
448+
(AssetModel.scheduled_dags.any(DagScheduleAssetReference.dag_id.in_(self.value)))
448449
| (AssetModel.producing_tasks.any(TaskOutletAssetReference.dag_id.in_(self.value)))
450+
| (AssetModel.consuming_tasks.any(TaskInletAssetReference.dag_id.in_(self.value)))
449451
)
450452

451453

airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ class DagScheduleAssetReference(StrictBaseModel):
3333
updated_at: datetime
3434

3535

36+
class TaskInletAssetReference(StrictBaseModel):
37+
"""Task inlet reference serializer for assets."""
38+
39+
dag_id: str
40+
task_id: str
41+
created_at: datetime
42+
updated_at: datetime
43+
44+
3645
class TaskOutletAssetReference(StrictBaseModel):
3746
"""Task outlet reference serializer for assets."""
3847

@@ -59,8 +68,9 @@ class AssetResponse(BaseModel):
5968
extra: dict | None = None
6069
created_at: datetime
6170
updated_at: datetime
62-
consuming_dags: list[DagScheduleAssetReference]
71+
scheduled_dags: list[DagScheduleAssetReference]
6372
producing_tasks: list[TaskOutletAssetReference]
73+
consuming_tasks: list[TaskInletAssetReference]
6474
aliases: list[AssetAliasResponse]
6575
last_asset_event: LastAssetEventResponse | None = None
6676

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7229,16 +7229,21 @@ components:
72297229
type: string
72307230
format: date-time
72317231
title: Updated At
7232-
consuming_dags:
7232+
scheduled_dags:
72337233
items:
72347234
$ref: '#/components/schemas/DagScheduleAssetReference'
72357235
type: array
7236-
title: Consuming Dags
7236+
title: Scheduled Dags
72377237
producing_tasks:
72387238
items:
72397239
$ref: '#/components/schemas/TaskOutletAssetReference'
72407240
type: array
72417241
title: Producing Tasks
7242+
consuming_tasks:
7243+
items:
7244+
$ref: '#/components/schemas/TaskInletAssetReference'
7245+
type: array
7246+
title: Consuming Tasks
72427247
aliases:
72437248
items:
72447249
$ref: '#/components/schemas/AssetAliasResponse'
@@ -7256,8 +7261,9 @@ components:
72567261
- group
72577262
- created_at
72587263
- updated_at
7259-
- consuming_dags
7264+
- scheduled_dags
72607265
- producing_tasks
7266+
- consuming_tasks
72617267
- aliases
72627268
title: AssetResponse
72637269
description: Asset serializer for responses.
@@ -9999,6 +10005,31 @@ components:
999910005
- reason
1000010006
title: TaskDependencyResponse
1000110007
description: Task Dependency serializer for responses.
10008+
TaskInletAssetReference:
10009+
properties:
10010+
dag_id:
10011+
type: string
10012+
title: Dag Id
10013+
task_id:
10014+
type: string
10015+
title: Task Id
10016+
created_at:
10017+
type: string
10018+
format: date-time
10019+
title: Created At
10020+
updated_at:
10021+
type: string
10022+
format: date-time
10023+
title: Updated At
10024+
additionalProperties: false
10025+
type: object
10026+
required:
10027+
- dag_id
10028+
- task_id
10029+
- created_at
10030+
- updated_at
10031+
title: TaskInletAssetReference
10032+
description: Task inlet reference serializer for assets.
1000210033
TaskInstanceCollectionResponse:
1000310034
properties:
1000410035
task_instances:

airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,9 @@ def get_assets(
179179

180180
assets_rows = session.execute(
181181
assets_select.options(
182-
subqueryload(AssetModel.consuming_dags),
182+
subqueryload(AssetModel.scheduled_dags),
183183
subqueryload(AssetModel.producing_tasks),
184+
subqueryload(AssetModel.consuming_tasks),
184185
)
185186
)
186187

@@ -456,7 +457,11 @@ def get_asset(
456457
asset = session.scalar(
457458
select(AssetModel)
458459
.where(AssetModel.id == asset_id)
459-
.options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks))
460+
.options(
461+
joinedload(AssetModel.scheduled_dags),
462+
joinedload(AssetModel.producing_tasks),
463+
joinedload(AssetModel.consuming_tasks),
464+
)
460465
)
461466

462467
last_asset_event_id = asset_event_rows[1] if asset_event_rows else None

airflow-core/src/airflow/assets/manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def register_asset_change(
130130
.options(
131131
joinedload(AssetModel.active),
132132
joinedload(AssetModel.aliases),
133-
joinedload(AssetModel.consuming_dags).joinedload(DagScheduleAssetReference.dag),
133+
joinedload(AssetModel.scheduled_dags).joinedload(DagScheduleAssetReference.dag),
134134
)
135135
)
136136
if not asset_model:
@@ -161,7 +161,7 @@ def register_asset_change(
161161
session.flush() # Ensure the event is written earlier than DDRQ entries below.
162162

163163
dags_to_queue_from_asset = {
164-
ref.dag for ref in asset_model.consuming_dags if not ref.dag.is_stale and not ref.dag.is_paused
164+
ref.dag for ref in asset_model.scheduled_dags if not ref.dag.is_stale and not ref.dag.is_paused
165165
}
166166

167167
dags_to_queue_from_asset_alias = set()
@@ -170,7 +170,7 @@ def register_asset_change(
170170
select(AssetAliasModel)
171171
.where(AssetAliasModel.name.in_(source_alias_names))
172172
.options(
173-
joinedload(AssetAliasModel.consuming_dags).joinedload(DagScheduleAssetAliasReference.dag)
173+
joinedload(AssetAliasModel.scheduled_dags).joinedload(DagScheduleAssetAliasReference.dag)
174174
)
175175
).unique()
176176

@@ -180,7 +180,7 @@ def register_asset_change(
180180

181181
dags_to_queue_from_asset_alias |= {
182182
alias_ref.dag
183-
for alias_ref in asset_alias_model.consuming_dags
183+
for alias_ref in asset_alias_model.scheduled_dags
184184
if not alias_ref.dag.is_stale and not alias_ref.dag.is_paused
185185
}
186186

airflow-core/src/airflow/dag_processing/collection.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
DagScheduleAssetNameReference,
4545
DagScheduleAssetReference,
4646
DagScheduleAssetUriReference,
47+
TaskInletAssetReference,
4748
TaskOutletAssetReference,
4849
)
4950
from airflow.models.dag import DAG, DagModel, DagOwnerAttributes, DagTag
@@ -608,7 +609,7 @@ def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session: Ses
608609
select(AssetModel.name, AssetModel.uri).where(
609610
tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
610611
AssetModel.active.has(),
611-
AssetModel.consuming_dags.any(
612+
AssetModel.scheduled_dags.any(
612613
DagScheduleAssetReference.dag.has(~DagModel.is_stale & ~DagModel.is_paused)
613614
),
614615
)
@@ -623,6 +624,7 @@ class AssetModelOperation(NamedTuple):
623624
schedule_asset_alias_references: dict[str, list[AssetAlias]]
624625
schedule_asset_name_references: set[tuple[str, str]] # dag_id, ref_name.
625626
schedule_asset_uri_references: set[tuple[str, str]] # dag_id, ref_uri.
627+
inlet_references: dict[str, list[tuple[str, Asset]]]
626628
outlet_references: dict[str, list[tuple[str, Asset]]]
627629
assets: dict[tuple[str, str], Asset]
628630
asset_aliases: dict[str, AssetAlias]
@@ -650,6 +652,9 @@ def collect(cls, dags: dict[str, MaybeSerializedDAG]) -> Self:
650652
for ref in dag.timetable.asset_condition.iter_asset_refs()
651653
if isinstance(ref, AssetUriRef)
652654
},
655+
inlet_references={
656+
dag_id: list(dag.get_task_assets(inlets=True, outlets=False)) for dag_id, dag in dags.items()
657+
},
653658
outlet_references={
654659
dag_id: list(dag.get_task_assets(inlets=False, outlets=True)) for dag_id, dag in dags.items()
655660
},
@@ -829,6 +834,24 @@ def add_task_asset_references(
829834
# Optimization: No assets means there are no references to update.
830835
if not assets:
831836
return
837+
for dag_id, references in self.inlet_references.items():
838+
# Optimization: no references at all; this is faster than repeated delete().
839+
if not references:
840+
dags[dag_id].task_inlet_asset_references = []
841+
continue
842+
referenced_inlets = {
843+
(task_id, asset.id)
844+
for task_id, asset in ((task_id, assets[d.name, d.uri]) for task_id, d in references)
845+
}
846+
orm_refs = {(r.task_id, r.asset_id): r for r in dags[dag_id].task_inlet_asset_references}
847+
for key, ref in orm_refs.items():
848+
if key not in referenced_inlets:
849+
session.delete(ref)
850+
session.bulk_save_objects(
851+
TaskInletAssetReference(asset_id=asset_id, dag_id=dag_id, task_id=task_id)
852+
for task_id, asset_id in referenced_inlets
853+
if (task_id, asset_id) not in orm_refs
854+
)
832855
for dag_id, references in self.outlet_references.items():
833856
# Optimization: no references at all; this is faster than repeated delete().
834857
if not references:
@@ -949,7 +972,7 @@ def add_asset_trigger_references(
949972

950973
# Remove references from assets no longer used
951974
orphan_assets = session.scalars(
952-
select(AssetModel).filter(~AssetModel.consuming_dags.any()).filter(AssetModel.triggers.any())
975+
select(AssetModel).filter(~AssetModel.scheduled_dags.any()).filter(AssetModel.triggers.any())
953976
)
954977
for asset_model in orphan_assets:
955978
if (asset_model.name, asset_model.uri) not in self.assets:

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2375,7 +2375,12 @@ def _activate_referenced_assets(assets: Collection[AssetModel], *, session: Sess
23752375
def _generate_warning_message(
23762376
offending: AssetModel, attr: str, value: str
23772377
) -> Iterator[tuple[str, str]]:
2378-
for ref in itertools.chain(offending.consuming_dags, offending.producing_tasks):
2378+
offending_references = itertools.chain(
2379+
offending.scheduled_dags,
2380+
offending.producing_tasks,
2381+
offending.consuming_tasks,
2382+
)
2383+
for ref in offending_references:
23792384
yield (
23802385
ref.dag_id,
23812386
(
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Add task_inlet_asset_reference table.
21+
22+
Revision ID: 583e80dfcef4
23+
Revises: 3ac9e5732b1f
24+
Create Date: 2025-06-04 06:26:36.536172
25+
"""
26+
27+
from __future__ import annotations
28+
29+
from alembic import op
30+
from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint
31+
32+
from airflow.migrations.db_types import StringID
33+
from airflow.utils.sqlalchemy import UtcDateTime
34+
35+
revision = "583e80dfcef4"
36+
down_revision = "3ac9e5732b1f"
37+
branch_labels = None
38+
depends_on = None
39+
airflow_version = "3.1.0"
40+
41+
42+
def upgrade():
43+
"""Add task_inlet_asset_reference table."""
44+
op.create_table(
45+
"task_inlet_asset_reference",
46+
Column("asset_id", Integer, primary_key=True, nullable=False),
47+
Column("dag_id", StringID(), primary_key=True, nullable=False),
48+
Column("task_id", StringID(), primary_key=True, nullable=False),
49+
Column("created_at", UtcDateTime, nullable=False),
50+
Column("updated_at", UtcDateTime, nullable=False),
51+
ForeignKeyConstraint(
52+
["asset_id"],
53+
["asset.id"],
54+
name="tiar_asset_fkey",
55+
ondelete="CASCADE",
56+
),
57+
PrimaryKeyConstraint("asset_id", "dag_id", "task_id", name="tiar_pkey"),
58+
ForeignKeyConstraint(
59+
columns=["dag_id"],
60+
refcolumns=["dag.dag_id"],
61+
name="tiar_dag_id_fkey",
62+
ondelete="CASCADE",
63+
),
64+
Index("idx_task_inlet_asset_reference_dag_id", "dag_id"),
65+
)
66+
67+
68+
def downgrade():
69+
"""Remove task_inlet_asset_reference table."""
70+
op.drop_table("task_inlet_asset_reference")

0 commit comments

Comments
 (0)