Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 7f7cc6c

Browse files
committedMay 15, 2025·
remove session.query and fix pre-commit checks
1 parent 50fd96f commit 7f7cc6c

File tree

5 files changed

+16
-21
lines changed

5 files changed

+16
-21
lines changed
 

‎airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,9 @@ def ti_run(
235235

236236
xcom_keys = list(session.scalars(query))
237237
task_reschedule_count = (
238-
session.query(
239-
func.count(TaskReschedule.id) # or any other primary key column
240-
)
241-
.filter(TaskReschedule.ti_id == ti_id_str)
242-
.scalar()
238+
session.execute(
239+
select(func.count(TaskReschedule.id)).where(TaskReschedule.ti_id == ti_id_str)
240+
).scalar()
243241
or 0
244242
)
245243

‎airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ def set_xcom(
270270
if not run_id:
271271
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Run with ID: `{run_id}` was not found")
272272

273-
dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, run_id=run_id).scalar()
273+
dag_run_id = session.execute(DagRun.id).where(dag_id=dag_id, run_id=run_id).scalar()
274274
if dag_run_id is None:
275275
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG run not found on DAG {dag_id} with ID {run_id}")
276276

‎airflow-core/src/airflow/dag_processing/bundles/manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,13 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
166166

167167
if inactive_bundle_names and active_bundle_names:
168168
new_bundle_name = sorted(active_bundle_names)[0]
169-
169+
170170
updated_rows = session.execute(
171171
update(DagVersion)
172172
.where(DagVersion.bundle_name.in_(inactive_bundle_names))
173173
.values({DagVersion.bundle_name: new_bundle_name})
174174
.execution_options(synchronize_session=False)
175-
)
175+
)
176176

177177
self.log.info(
178178
"Updated %d DAG versions from inactive bundles to active bundle %s",

‎airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2058,17 +2058,16 @@ def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NE
20582058

20592059
@provide_session
20602060
def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
2061-
running = (
2062-
session.query(
2061+
running = session.execute(
2062+
select(
20632063
TaskInstance.dag_id,
20642064
TaskInstance.task_id,
20652065
TaskInstance.queue,
20662066
func.count(TaskInstance.task_id).label("running_count"),
20672067
)
2068-
.filter(TaskInstance.state == State.RUNNING)
2068+
.where(TaskInstance.state == State.RUNNING)
20692069
.group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.queue)
2070-
.all()
2071-
)
2070+
).all()
20722071

20732072
ti_running_metrics = {(row.dag_id, row.task_id, row.queue): row.running_count for row in running}
20742073

‎airflow-core/src/airflow/models/xcom.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,9 @@ def clear(
144144

145145
query = select(cls).where(dag_id=dag_id, task_id=task_id, run_id=run_id)
146146
if map_index is not None:
147-
148147
query = query.where(map_index=map_index)
149-
150-
for xcom in session.scalars(query)
148+
149+
for xcom in session.scalars(query):
151150
# print(f"Clearing XCOM {xcom} with value {xcom.value}")
152151
session.delete(xcom)
153152

@@ -301,24 +300,23 @@ def get_many(
301300
query = query.where(cls.task_id == task_ids)
302301

303302
if is_container(dag_ids):
304-
305303
query = query.where(cls.dag_id.in_(dag_ids))
306304

307305
elif dag_ids is not None:
308306
query = query.where(cls.dag_id == dag_ids)
309307

310308
if isinstance(map_indexes, range) and map_indexes.step == 1:
311-
312309
query = query.where(cls.map_index >= map_indexes.start, cls.map_index < map_indexes.stop)
313310
elif is_container(map_indexes):
314311
query = query.where(cls.map_index.in_(map_indexes))
315312
elif map_indexes is not None:
316313
query = query.where(cls.map_index == map_indexes)
317314

318315
if include_prior_dates:
319-
320-
dr = (select(func.coalesce(DagRun.logical_date,DagRun.run_after)
321-
.label("logical_date_or_run_after"))
316+
dr = (
317+
select(
318+
func.coalesce(DagRun.logical_date, DagRun.run_after).label("logical_date_or_run_after")
319+
)
322320
.where(DagRun.run_id == run_id)
323321
.subquery()
324322
)

0 commit comments

Comments
 (0)
Please sign in to comment.