Skip to content

Commit 7bd4198

Browse files
choo121600HsiuChuanHsu
authored andcommitted
Fix Dag list filtering to include QUEUED DagRuns with null start_date (apache#52668)
* Fix inconsistent queued DAG filtering between dashboard and list page * Fix Dag list filtering to include DagRuns with null start_date * Remove unnecessary @provide_session decorators from test_dags.py
1 parent 6437f45 commit 7bd4198

File tree

3 files changed

+282
-2
lines changed

3 files changed

+282
-2
lines changed

airflow-core/src/airflow/api_fastapi/common/db/dags.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from typing import TYPE_CHECKING
2121

22-
from sqlalchemy import func, null, select
22+
from sqlalchemy import func, select
2323

2424
from airflow.api_fastapi.common.db.common import (
2525
apply_filters_to_select,
@@ -37,7 +37,6 @@ def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam], order_b
3737

3838
max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption
3939
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
40-
.where(DagRun.start_date.is_not(null()))
4140
.group_by(DagRun.dag_id)
4241
.subquery(name="mrq")
4342
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from datetime import datetime, timezone
21+
22+
import pytest
23+
24+
from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query
25+
from airflow.api_fastapi.common.parameters import SortParam
26+
from airflow.models import DagModel
27+
from airflow.models.dagrun import DagRun
28+
from airflow.utils.state import DagRunState
29+
from airflow.utils.timezone import utcnow
30+
31+
from tests_common.test_utils.db import clear_db_dags, clear_db_runs
32+
33+
pytestmark = pytest.mark.db_test
34+
35+
36+
class TestGenerateDagWithLatestRunQuery:
37+
"""Unit tests for generate_dag_with_latest_run_query function."""
38+
39+
@staticmethod
40+
def _clear_db():
41+
clear_db_runs()
42+
clear_db_dags()
43+
44+
@pytest.fixture(autouse=True)
45+
def setup_teardown(self):
46+
"""Setup and teardown for each test."""
47+
self._clear_db()
48+
yield
49+
self._clear_db()
50+
51+
@pytest.fixture
52+
def dag_with_queued_run(self, session):
53+
"""Returns a DAG with a QUEUED DagRun and null start_date."""
54+
55+
dag_id = "dag_with_queued_run"
56+
57+
# Create DagModel
58+
dag_model = DagModel(
59+
dag_id=dag_id,
60+
is_stale=False,
61+
is_paused=False,
62+
fileloc="/tmp/dag.py",
63+
)
64+
session.add(dag_model)
65+
session.flush()
66+
67+
# Create DagRun with start_date=None (QUEUED state)
68+
dagrun = DagRun(
69+
dag_id=dag_id,
70+
run_id="manual__queued",
71+
run_type="manual",
72+
logical_date=utcnow(),
73+
state=DagRunState.QUEUED,
74+
start_date=None,
75+
)
76+
session.add(dagrun)
77+
session.commit()
78+
79+
return dag_model, dagrun
80+
81+
@pytest.fixture
82+
def dag_with_running_run(self, session):
83+
"""Returns a DAG with a RUNNING DagRun and a valid start_date."""
84+
85+
dag_id = "dag_with_running_run"
86+
87+
# Create DagModel
88+
dag_model = DagModel(
89+
dag_id=dag_id,
90+
is_stale=False,
91+
is_paused=False,
92+
fileloc="/tmp/dag2.py",
93+
)
94+
session.add(dag_model)
95+
session.flush()
96+
97+
# Create DagRun with start_date set (RUNNING state)
98+
start_time = utcnow()
99+
dagrun = DagRun(
100+
dag_id=dag_id,
101+
run_id="manual__running",
102+
run_type="manual",
103+
logical_date=start_time,
104+
state=DagRunState.RUNNING,
105+
start_date=start_time,
106+
)
107+
session.add(dagrun)
108+
session.commit()
109+
110+
return dag_model, dagrun
111+
112+
def test_includes_queued_run_without_start_date(self, dag_with_queued_run, session):
113+
"""DAGs with QUEUED runs and null start_date should be included when no filters are applied, and joined DagRun state must not be None."""
114+
dag_model, _ = dag_with_queued_run
115+
query = generate_dag_with_latest_run_query(
116+
max_run_filters=[],
117+
order_by=SortParam(allowed_attrs=["dag_id"], model=DagModel).set_value("dag_id"),
118+
)
119+
120+
# Also fetch joined DagRun's state and start_date
121+
extended_query = query.add_columns(DagRun.state, DagRun.start_date)
122+
result = session.execute(extended_query).fetchall()
123+
dag_row = next((row for row in result if row[0].dag_id == dag_model.dag_id), None)
124+
assert dag_row is not None
125+
dagrun_state = dag_row[1]
126+
assert dagrun_state is not None, "Joined DagRun state must not be None"
127+
128+
def test_includes_queued_run_when_ordering_by_state(
129+
self, dag_with_queued_run, dag_with_running_run, session
130+
):
131+
"""DAGs with QUEUED runs and null start_date, and RUNNING runs must all have joined DagRun info not None."""
132+
queued_dag_model, _ = dag_with_queued_run
133+
running_dag_model, _ = dag_with_running_run
134+
135+
query = generate_dag_with_latest_run_query(
136+
max_run_filters=[],
137+
order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value("last_run_state"),
138+
)
139+
extended_query = query.add_columns(DagRun.state, DagRun.start_date)
140+
result = session.execute(extended_query).fetchall()
141+
142+
# QUEUED DAG
143+
queued_row = next((row for row in result if row[0].dag_id == queued_dag_model.dag_id), None)
144+
assert queued_row is not None
145+
assert queued_row[1] is not None, "Joined DagRun state for QUEUED DAG must not be None"
146+
# RUNNING DAG
147+
running_row = next((row for row in result if row[0].dag_id == running_dag_model.dag_id), None)
148+
assert running_row is not None
149+
assert running_row[1] is not None, "Joined DagRun state for RUNNING DAG must not be None"
150+
assert running_row[2] is not None, "Joined DagRun start_date for RUNNING DAG must not be None"
151+
152+
def test_includes_queued_run_when_ordering_by_start_date(
153+
self, dag_with_queued_run, dag_with_running_run, session
154+
):
155+
"""DAGs with QUEUED runs and RUNNING runs must all have joined DagRun info not None when ordering by start_date."""
156+
queued_dag_model, _ = dag_with_queued_run
157+
running_dag_model, _ = dag_with_running_run
158+
159+
query = generate_dag_with_latest_run_query(
160+
max_run_filters=[],
161+
order_by=SortParam(allowed_attrs=["last_run_start_date"], model=DagModel).set_value(
162+
"last_run_start_date"
163+
),
164+
)
165+
extended_query = query.add_columns(DagRun.state, DagRun.start_date)
166+
result = session.execute(extended_query).fetchall()
167+
168+
# QUEUED DAG
169+
queued_row = next((row for row in result if row[0].dag_id == queued_dag_model.dag_id), None)
170+
assert queued_row is not None
171+
assert queued_row[1] is not None, "Joined DagRun state for QUEUED DAG must not be None"
172+
# RUNNING DAG
173+
running_row = next((row for row in result if row[0].dag_id == running_dag_model.dag_id), None)
174+
assert running_row is not None
175+
assert running_row[1] is not None, "Joined DagRun state for RUNNING DAG must not be None"
176+
assert running_row[2] is not None, "Joined DagRun start_date for RUNNING DAG must not be None"
177+
178+
def test_latest_queued_run_without_start_date_is_included(self, session):
179+
"""Even if the latest DagRun is QUEUED+start_date=None, joined DagRun state must not be None."""
180+
dag_id = "dag_with_multiple_runs"
181+
dag_model = DagModel(
182+
dag_id=dag_id,
183+
is_stale=False,
184+
is_paused=False,
185+
fileloc="/tmp/dag3.py",
186+
)
187+
session.add(dag_model)
188+
session.flush()
189+
older_run = DagRun(
190+
dag_id=dag_id,
191+
run_id="manual__older",
192+
run_type="manual",
193+
logical_date=datetime(2025, 1, 1, tzinfo=timezone.utc),
194+
state=DagRunState.SUCCESS,
195+
start_date=datetime(2025, 1, 1, tzinfo=timezone.utc),
196+
)
197+
session.add(older_run)
198+
newer_run = DagRun(
199+
dag_id=dag_id,
200+
run_id="manual__newer_queued",
201+
run_type="manual",
202+
logical_date=utcnow(),
203+
state=DagRunState.QUEUED,
204+
start_date=None,
205+
)
206+
session.add(newer_run)
207+
session.commit()
208+
query = generate_dag_with_latest_run_query(
209+
max_run_filters=[],
210+
order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value("last_run_state"),
211+
)
212+
extended_query = query.add_columns(DagRun.state, DagRun.start_date)
213+
result = session.execute(extended_query).fetchall()
214+
dag_row = next((row for row in result if row[0].dag_id == dag_id), None)
215+
assert dag_row is not None
216+
assert dag_row[1] is not None, (
217+
"Even if latest DagRun is QUEUED+start_date=None, state must not be None"
218+
)
219+
220+
def test_queued_runs_with_null_start_date_are_properly_joined(
221+
self, dag_with_queued_run, dag_with_running_run, session
222+
):
223+
"""
224+
Verifies that DAGs with null start_date are properly joined in the query.
225+
226+
If a WHERE clause filters out null start_dates, these DAGs would be excluded.
227+
This test ensures they are still present and joined correctly.
228+
"""
229+
230+
queued_dag_model, _ = dag_with_queued_run
231+
running_dag_model, _ = dag_with_running_run
232+
query = generate_dag_with_latest_run_query(
233+
max_run_filters=[],
234+
order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value("last_run_state"),
235+
)
236+
extended_query = query.add_columns(DagRun.state, DagRun.start_date)
237+
238+
result = session.execute(extended_query).fetchall()
239+
240+
# Find results for each DAG
241+
queued_dag_result = None
242+
running_dag_result = None
243+
244+
for row in result:
245+
dag_model = row[0]
246+
if dag_model.dag_id == queued_dag_model.dag_id:
247+
queued_dag_result = row
248+
elif dag_model.dag_id == running_dag_model.dag_id:
249+
running_dag_result = row
250+
251+
# Assert both DAGs are present
252+
assert queued_dag_result is not None, f"Queued DAG {queued_dag_model.dag_id} should be in results"
253+
assert running_dag_result is not None, f"Running DAG {running_dag_model.dag_id} should be in results"
254+
255+
# if WHERE start_date IS NOT NULL is present,
256+
# the queued DAG should have NO DagRun information joined (state=None, start_date=None)
257+
# But the running DAG should have DagRun information joined
258+
259+
queued_dagrun_state = queued_dag_result[1]
260+
running_dagrun_state = running_dag_result[1]
261+
assert queued_dagrun_state is not None, (
262+
"Queued DAG should have DagRun state joined, but got None. "
263+
"This suggests the WHERE start_date IS NOT NULL condition is excluding it."
264+
)
265+
assert running_dagrun_state is not None, "Running DAG should have DagRun state joined"

0 commit comments

Comments
 (0)