Skip to content

Commit 067eede

Browse files
committed
Store inlet references in database and expose API
1 parent e7aa359 commit 067eede

File tree

19 files changed

+1778
-1466
lines changed

19 files changed

+1778
-1466
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
789370f7c29cc290c307562a13f06576d8651e12916cc42f89653f8127c00210
1+
7e97c71ee6da77d758087a5d7129ac8d63b52dcc3bf4c46b7faf5977faa53ef4

airflow-core/docs/img/airflow_erd.svg

Lines changed: 1504 additions & 1459 deletions
Loading

airflow-core/docs/migrations-ref.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
3939
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4040
| Revision ID | Revises ID | Airflow Version | Description |
4141
+=========================+==================+===================+==============================================================+
42-
| ``3ac9e5732b1f`` (head) | ``0242ac120002`` | ``3.1.0`` | Change the on-delete behaviour of |
42+
| ``583e80dfcef4`` (head) | ``3ac9e5732b1f`` | ``3.1.0`` | Add task_inlet_asset_reference table. |
43+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
44+
| ``3ac9e5732b1f`` | ``0242ac120002`` | ``3.1.0`` | Change the on-delete behaviour of |
4345
| | | | task_instance.dag_version_id foreign key constraint to |
4446
| | | | RESTRICT. |
4547
+-------------------------+------------------+-------------------+--------------------------------------------------------------+

airflow-core/src/airflow/api_fastapi/common/parameters.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
AssetAliasModel,
4747
AssetModel,
4848
DagScheduleAssetReference,
49+
TaskInletAssetReference,
4950
TaskOutletAssetReference,
5051
)
5152
from airflow.models.connection import Connection
@@ -446,6 +447,7 @@ def to_orm(self, select: Select) -> Select:
446447
return select.where(
447448
(AssetModel.scheduled_dags.any(DagScheduleAssetReference.dag_id.in_(self.value)))
448449
| (AssetModel.producing_tasks.any(TaskOutletAssetReference.dag_id.in_(self.value)))
450+
| (AssetModel.consuming_tasks.any(TaskInletAssetReference.dag_id.in_(self.value)))
449451
)
450452

451453

airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ class DagScheduleAssetReference(StrictBaseModel):
3333
updated_at: datetime
3434

3535

36+
class TaskInletAssetReference(StrictBaseModel):
37+
"""Task inlet reference serializer for assets."""
38+
39+
dag_id: str
40+
task_id: str
41+
created_at: datetime
42+
updated_at: datetime
43+
44+
3645
class TaskOutletAssetReference(StrictBaseModel):
3746
"""Task outlet reference serializer for assets."""
3847

@@ -61,6 +70,7 @@ class AssetResponse(BaseModel):
6170
updated_at: datetime
6271
scheduled_dags: list[DagScheduleAssetReference]
6372
producing_tasks: list[TaskOutletAssetReference]
73+
consuming_tasks: list[TaskInletAssetReference]
6474
aliases: list[AssetAliasResponse]
6575
last_asset_event: LastAssetEventResponse | None = None
6676

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7239,6 +7239,11 @@ components:
72397239
$ref: '#/components/schemas/TaskOutletAssetReference'
72407240
type: array
72417241
title: Producing Tasks
7242+
consuming_tasks:
7243+
items:
7244+
$ref: '#/components/schemas/TaskInletAssetReference'
7245+
type: array
7246+
title: Consuming Tasks
72427247
aliases:
72437248
items:
72447249
$ref: '#/components/schemas/AssetAliasResponse'
@@ -7258,6 +7263,7 @@ components:
72587263
- updated_at
72597264
- scheduled_dags
72607265
- producing_tasks
7266+
- consuming_tasks
72617267
- aliases
72627268
title: AssetResponse
72637269
description: Asset serializer for responses.
@@ -9999,6 +10005,31 @@ components:
999910005
- reason
1000010006
title: TaskDependencyResponse
1000110007
description: Task Dependency serializer for responses.
10008+
TaskInletAssetReference:
10009+
properties:
10010+
dag_id:
10011+
type: string
10012+
title: Dag Id
10013+
task_id:
10014+
type: string
10015+
title: Task Id
10016+
created_at:
10017+
type: string
10018+
format: date-time
10019+
title: Created At
10020+
updated_at:
10021+
type: string
10022+
format: date-time
10023+
title: Updated At
10024+
additionalProperties: false
10025+
type: object
10026+
required:
10027+
- dag_id
10028+
- task_id
10029+
- created_at
10030+
- updated_at
10031+
title: TaskInletAssetReference
10032+
description: Task inlet reference serializer for assets.
1000210033
TaskInstanceCollectionResponse:
1000310034
properties:
1000410035
task_instances:

airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def get_assets(
181181
assets_select.options(
182182
subqueryload(AssetModel.scheduled_dags),
183183
subqueryload(AssetModel.producing_tasks),
184+
subqueryload(AssetModel.consuming_tasks),
184185
)
185186
)
186187

@@ -456,7 +457,11 @@ def get_asset(
456457
asset = session.scalar(
457458
select(AssetModel)
458459
.where(AssetModel.id == asset_id)
459-
.options(joinedload(AssetModel.scheduled_dags), joinedload(AssetModel.producing_tasks))
460+
.options(
461+
joinedload(AssetModel.scheduled_dags),
462+
joinedload(AssetModel.producing_tasks),
463+
joinedload(AssetModel.consuming_tasks),
464+
)
460465
)
461466

462467
last_asset_event_id = asset_event_rows[1] if asset_event_rows else None

airflow-core/src/airflow/dag_processing/collection.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
DagScheduleAssetNameReference,
4545
DagScheduleAssetReference,
4646
DagScheduleAssetUriReference,
47+
TaskInletAssetReference,
4748
TaskOutletAssetReference,
4849
)
4950
from airflow.models.dag import DAG, DagModel, DagOwnerAttributes, DagTag
@@ -623,6 +624,7 @@ class AssetModelOperation(NamedTuple):
623624
schedule_asset_alias_references: dict[str, list[AssetAlias]]
624625
schedule_asset_name_references: set[tuple[str, str]] # dag_id, ref_name.
625626
schedule_asset_uri_references: set[tuple[str, str]] # dag_id, ref_uri.
627+
inlet_references: dict[str, list[tuple[str, Asset]]]
626628
outlet_references: dict[str, list[tuple[str, Asset]]]
627629
assets: dict[tuple[str, str], Asset]
628630
asset_aliases: dict[str, AssetAlias]
@@ -650,6 +652,9 @@ def collect(cls, dags: dict[str, MaybeSerializedDAG]) -> Self:
650652
for ref in dag.timetable.asset_condition.iter_asset_refs()
651653
if isinstance(ref, AssetUriRef)
652654
},
655+
inlet_references={
656+
dag_id: list(dag.get_task_assets(inlets=True, outlets=False)) for dag_id, dag in dags.items()
657+
},
653658
outlet_references={
654659
dag_id: list(dag.get_task_assets(inlets=False, outlets=True)) for dag_id, dag in dags.items()
655660
},
@@ -829,6 +834,24 @@ def add_task_asset_references(
829834
# Optimization: No assets means there are no references to update.
830835
if not assets:
831836
return
837+
for dag_id, references in self.inlet_references.items():
838+
# Optimization: no references at all; this is faster than repeated delete().
839+
if not references:
840+
dags[dag_id].task_inlet_asset_references = []
841+
continue
842+
referenced_inlets = {
843+
(task_id, asset.id)
844+
for task_id, asset in ((task_id, assets[d.name, d.uri]) for task_id, d in references)
845+
}
846+
orm_refs = {(r.task_id, r.asset_id): r for r in dags[dag_id].task_inlet_asset_references}
847+
for key, ref in orm_refs.items():
848+
if key not in referenced_inlets:
849+
session.delete(ref)
850+
session.bulk_save_objects(
851+
TaskInletAssetReference(asset_id=asset_id, dag_id=dag_id, task_id=task_id)
852+
for task_id, asset_id in referenced_inlets
853+
if (task_id, asset_id) not in orm_refs
854+
)
832855
for dag_id, references in self.outlet_references.items():
833856
# Optimization: no references at all; this is faster than repeated delete().
834857
if not references:

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2366,7 +2366,12 @@ def _activate_referenced_assets(assets: Collection[AssetModel], *, session: Sess
23662366
def _generate_warning_message(
23672367
offending: AssetModel, attr: str, value: str
23682368
) -> Iterator[tuple[str, str]]:
2369-
for ref in itertools.chain(offending.scheduled_dags, offending.producing_tasks):
2369+
offending_references = itertools.chain(
2370+
offending.scheduled_dags,
2371+
offending.producing_tasks,
2372+
offending.consuming_tasks,
2373+
)
2374+
for ref in offending_references:
23702375
yield (
23712376
ref.dag_id,
23722377
(
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Add task_inlet_asset_reference table.
21+
22+
Revision ID: 583e80dfcef4
23+
Revises: 3ac9e5732b1f
24+
Create Date: 2025-06-04 06:26:36.536172
25+
"""
26+
27+
from __future__ import annotations
28+
29+
from alembic import op
30+
from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint
31+
32+
from airflow.migrations.db_types import StringID
33+
from airflow.utils.sqlalchemy import UtcDateTime
34+
35+
revision = "583e80dfcef4"
36+
down_revision = "3ac9e5732b1f"
37+
branch_labels = None
38+
depends_on = None
39+
airflow_version = "3.1.0"
40+
41+
42+
def upgrade():
43+
"""Add task_inlet_asset_reference table."""
44+
op.create_table(
45+
"task_inlet_asset_reference",
46+
Column("asset_id", Integer, primary_key=True, nullable=False),
47+
Column("dag_id", StringID(), primary_key=True, nullable=False),
48+
Column("task_id", StringID(), primary_key=True, nullable=False),
49+
Column("created_at", UtcDateTime, nullable=False),
50+
Column("updated_at", UtcDateTime, nullable=False),
51+
ForeignKeyConstraint(
52+
["asset_id"],
53+
["asset.id"],
54+
name="tiar_asset_fkey",
55+
ondelete="CASCADE",
56+
),
57+
PrimaryKeyConstraint("asset_id", "dag_id", "task_id", name="tiar_pkey"),
58+
ForeignKeyConstraint(
59+
columns=["dag_id"],
60+
refcolumns=["dag.dag_id"],
61+
name="tiar_dag_id_fkey",
62+
ondelete="CASCADE",
63+
),
64+
Index("idx_task_inlet_asset_reference_dag_id", "dag_id"),
65+
)
66+
67+
68+
def downgrade():
69+
"""Remove task_inlet_asset_reference table."""
70+
op.drop_table("task_inlet_asset_reference")

airflow-core/src/airflow/models/asset.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ class AssetModel(Base):
274274

275275
scheduled_dags = relationship("DagScheduleAssetReference", back_populates="asset")
276276
producing_tasks = relationship("TaskOutletAssetReference", back_populates="asset")
277+
consuming_tasks = relationship("TaskInletAssetReference", back_populates="asset")
277278
triggers = relationship("Trigger", secondary=asset_trigger_association_table, back_populates="assets")
278279

279280
__tablename__ = "asset"
@@ -612,6 +613,50 @@ def __repr__(self):
612613
return f"{self.__class__.__name__}({', '.join(args)})"
613614

614615

616+
class TaskInletAssetReference(Base):
617+
"""References from a task to an asset that it references as an inlet."""
618+
619+
asset_id = Column(Integer, primary_key=True, nullable=False)
620+
dag_id = Column(StringID(), primary_key=True, nullable=False)
621+
task_id = Column(StringID(), primary_key=True, nullable=False)
622+
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
623+
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
624+
625+
asset = relationship("AssetModel", back_populates="consuming_tasks")
626+
627+
__tablename__ = "task_inlet_asset_reference"
628+
__table_args__ = (
629+
ForeignKeyConstraint(
630+
(asset_id,),
631+
["asset.id"],
632+
name="tiar_asset_fkey",
633+
ondelete="CASCADE",
634+
),
635+
PrimaryKeyConstraint(asset_id, dag_id, task_id, name="tiar_pkey"),
636+
ForeignKeyConstraint(
637+
columns=(dag_id,),
638+
refcolumns=["dag.dag_id"],
639+
name="tiar_dag_id_fkey",
640+
ondelete="CASCADE",
641+
),
642+
Index("idx_task_inlet_asset_reference_dag_id", dag_id),
643+
)
644+
645+
def __eq__(self, other: object) -> bool:
646+
if not isinstance(other, self.__class__):
647+
return NotImplemented
648+
return (
649+
self.asset_id == other.asset_id and self.dag_id == other.dag_id and self.task_id == other.task_id
650+
)
651+
652+
def __hash__(self):
653+
return hash(self.__mapper__.primary_key)
654+
655+
def __repr__(self):
656+
args = (f"{(attr := x.name)}={getattr(self, attr)!r}" for x in self.__mapper__.primary_key)
657+
return f"{self.__class__.__name__}({', '.join(args)})"
658+
659+
615660
class AssetDagRunQueue(Base):
616661
"""Model for storing asset events that need processing."""
617662

airflow-core/src/airflow/models/dag.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1756,7 +1756,7 @@ def get_task_assets(
17561756
of_type: type[AssetT] = Asset, # type: ignore[assignment]
17571757
) -> Generator[tuple[str, AssetT], None, None]:
17581758
for task in self.task_dict.values():
1759-
directions = ("inlets",) if inlets else ()
1759+
directions: tuple[str, ...] = ("inlets",) if inlets else ()
17601760
if outlets:
17611761
directions += ("outlets",)
17621762
for direction in directions:
@@ -1966,6 +1966,10 @@ class DagModel(Base):
19661966
cascade="all, delete, delete-orphan",
19671967
)
19681968
schedule_assets = association_proxy("schedule_asset_references", "asset")
1969+
task_inlet_asset_references = relationship(
1970+
"TaskInletAssetReference",
1971+
cascade="all, delete, delete-orphan",
1972+
)
19691973
task_outlet_asset_references = relationship(
19701974
"TaskOutletAssetReference",
19711975
cascade="all, delete, delete-orphan",

0 commit comments

Comments
 (0)