Skip to content

Commit 86843ad

Browse files
committed
feat(hitl): add filters and order by to get_hitl_details public API
* filter: dag_id, dag_run_id, ti_state, response_recevied, user_id, subject, body * sort_by: dag_id, run_id
1 parent 234a163 commit 86843ad

File tree

6 files changed

+219
-39
lines changed

6 files changed

+219
-39
lines changed

airflow-core/src/airflow/api_fastapi/common/parameters.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from airflow.models.dag_favorite import DagFavorite
5353
from airflow.models.dag_version import DagVersion
5454
from airflow.models.dagrun import DagRun
55+
from airflow.models.hitl import HITLDetail
5556
from airflow.models.pool import Pool
5657
from airflow.models.taskinstance import TaskInstance
5758
from airflow.models.variable import Variable
@@ -750,3 +751,27 @@ def _optional_boolean(value: bool | None) -> bool | None:
750751
QueryConnectionIdPatternSearch = Annotated[
751752
_SearchParam, Depends(search_param_factory(Connection.conn_id, "connection_id_pattern"))
752753
]
754+
755+
# Human in the loop
756+
QueryDagRunIdFilter = Annotated[
757+
FilterParam[list[str]],
758+
Depends(filter_param_factory(DagRun.id, list[str], filter_name="dag_run_id")),
759+
]
760+
QueryHITLDetailResponseReceivedFilter = Annotated[
761+
FilterParam[bool | None],
762+
Depends(
763+
filter_param_factory(
764+
HITLDetail.response_received, bool | None, filter_name="hitl_detail_response_received"
765+
)
766+
),
767+
]
768+
QueryHITLDetailUserIdFilter = Annotated[
769+
FilterParam[list[str]],
770+
Depends(filter_param_factory(HITLDetail.user_id, list[str], filter_name="hitl_detail_user_id")),
771+
]
772+
QueryHITLDetailSubjectSearch = Annotated[
773+
_SearchParam, Depends(search_param_factory(HITLDetail.subject, "hitl_detail_subject_search"))
774+
]
775+
QueryHITLDetailBodySearch = Annotated[
776+
_SearchParam, Depends(search_param_factory(HITLDetail.body, "hitl_detail_body_search"))
777+
]

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7399,6 +7399,100 @@ paths:
73997399
summary: Get Hitl Details
74007400
description: Get Human-in-the-loop details.
74017401
operationId: get_hitl_details
7402+
security:
7403+
- OAuth2PasswordBearer: []
7404+
parameters:
7405+
- name: limit
7406+
in: query
7407+
required: false
7408+
schema:
7409+
type: integer
7410+
minimum: 0
7411+
default: 50
7412+
title: Limit
7413+
- name: offset
7414+
in: query
7415+
required: false
7416+
schema:
7417+
type: integer
7418+
minimum: 0
7419+
default: 0
7420+
title: Offset
7421+
- name: order_by
7422+
in: query
7423+
required: false
7424+
schema:
7425+
type: string
7426+
default: map_index
7427+
title: Order By
7428+
- name: dag_id_pattern
7429+
in: query
7430+
required: false
7431+
schema:
7432+
anyOf:
7433+
- type: string
7434+
- type: 'null'
7435+
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
7436+
\ Regular expressions are **not** supported."
7437+
title: Dag Id Pattern
7438+
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
7439+
\ Regular expressions are **not** supported."
7440+
- name: dag_run_id
7441+
in: query
7442+
required: false
7443+
schema:
7444+
type: array
7445+
items:
7446+
type: string
7447+
title: Dag Run Id
7448+
- name: state
7449+
in: query
7450+
required: false
7451+
schema:
7452+
type: array
7453+
items:
7454+
type: string
7455+
title: State
7456+
- name: hitl_detail_response_received
7457+
in: query
7458+
required: false
7459+
schema:
7460+
anyOf:
7461+
- type: boolean
7462+
- type: 'null'
7463+
title: Hitl Detail Response Received
7464+
- name: hitl_detail_user_id
7465+
in: query
7466+
required: false
7467+
schema:
7468+
type: array
7469+
items:
7470+
type: string
7471+
title: Hitl Detail User Id
7472+
- name: hitl_detail_subject_search
7473+
in: query
7474+
required: false
7475+
schema:
7476+
anyOf:
7477+
- type: string
7478+
- type: 'null'
7479+
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
7480+
\ Regular expressions are **not** supported."
7481+
title: Hitl Detail Subject Search
7482+
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
7483+
\ Regular expressions are **not** supported."
7484+
- name: hitl_detail_body_search
7485+
in: query
7486+
required: false
7487+
schema:
7488+
anyOf:
7489+
- type: string
7490+
- type: 'null'
7491+
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
7492+
\ Regular expressions are **not** supported."
7493+
title: Hitl Detail Body Search
7494+
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
7495+
\ Regular expressions are **not** supported."
74027496
responses:
74037497
'200':
74047498
description: Successful Response
@@ -7407,19 +7501,23 @@ paths:
74077501
schema:
74087502
$ref: '#/components/schemas/HITLDetailCollection'
74097503
'401':
7410-
description: Unauthorized
74117504
content:
74127505
application/json:
74137506
schema:
74147507
$ref: '#/components/schemas/HTTPExceptionResponse'
7508+
description: Unauthorized
74157509
'403':
7416-
description: Forbidden
74177510
content:
74187511
application/json:
74197512
schema:
74207513
$ref: '#/components/schemas/HTTPExceptionResponse'
7421-
security:
7422-
- OAuth2PasswordBearer: []
7514+
description: Forbidden
7515+
'422':
7516+
description: Validation Error
7517+
content:
7518+
application/json:
7519+
schema:
7520+
$ref: '#/components/schemas/HTTPValidationError'
74237521
/api/v2/monitor/health:
74247522
get:
74257523
tags:
@@ -9891,9 +9989,8 @@ components:
98919989
description: Serializer for Plugin FastAPI root middleware responses.
98929990
HITLDetail:
98939991
properties:
9894-
ti_id:
9895-
type: string
9896-
title: Ti Id
9992+
task_instance:
9993+
$ref: '#/components/schemas/TaskInstanceResponse'
98979994
options:
98989995
items:
98999996
type: string
@@ -9950,7 +10047,7 @@ components:
995010047
default: false
995110048
type: object
995210049
required:
9953-
- ti_id
10050+
- task_instance
995410051
- options
995510052
- subject
995610053
title: HITLDetail

airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,27 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from typing import Annotated
20+
1921
import structlog
2022
from fastapi import Depends, HTTPException, status
2123
from sqlalchemy import select
2224
from sqlalchemy.orm import joinedload
2325

2426
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
2527
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
28+
from airflow.api_fastapi.common.parameters import (
29+
QueryDagIdPatternSearch,
30+
QueryDagRunIdFilter,
31+
QueryHITLDetailBodySearch,
32+
QueryHITLDetailResponseReceivedFilter,
33+
QueryHITLDetailSubjectSearch,
34+
QueryHITLDetailUserIdFilter,
35+
QueryLimit,
36+
QueryOffset,
37+
QueryTIStateFilter,
38+
SortParam,
39+
)
2640
from airflow.api_fastapi.common.router import AirflowRouter
2741
from airflow.api_fastapi.core_api.datamodels.hitl import (
2842
HITLDetail,
@@ -262,16 +276,55 @@ def get_mapped_ti_hitl_detail(
262276
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE))],
263277
)
264278
def get_hitl_details(
265-
readable_ti_filter: ReadableTIFilterDep,
279+
limit: QueryLimit,
280+
offset: QueryOffset,
281+
order_by: Annotated[
282+
SortParam,
283+
Depends(
284+
SortParam(
285+
["dag_id", "run_id"],
286+
TI,
287+
).dynamic_depends(default="map_index"),
288+
),
289+
],
266290
session: SessionDep,
291+
# ti related filter
292+
readable_ti_filter: ReadableTIFilterDep,
293+
dag_id_pattern: QueryDagIdPatternSearch,
294+
dag_run_id: QueryDagRunIdFilter,
295+
ti_state: QueryTIStateFilter,
296+
# hitl detail related filter
297+
response_received: QueryHITLDetailResponseReceivedFilter,
298+
user_id: QueryHITLDetailUserIdFilter,
299+
subject_patten: QueryHITLDetailSubjectSearch,
300+
body_patten: QueryHITLDetailBodySearch,
267301
) -> HITLDetailCollection:
268302
"""Get Human-in-the-loop details."""
269-
query = select(HITLDetailModel).join(TI, HITLDetailModel.ti_id == TI.id)
303+
query = (
304+
select(HITLDetailModel)
305+
.join(TI, HITLDetailModel.ti_id == TI.id)
306+
.options(joinedload(HITLDetailModel.task_instance))
307+
)
270308
hitl_detail_select, total_entries = paginated_select(
271309
statement=query,
272-
filters=[readable_ti_filter],
310+
filters=[
311+
# ti related filter
312+
readable_ti_filter,
313+
dag_id_pattern,
314+
dag_run_id,
315+
ti_state,
316+
# hitl detail related filter
317+
response_received,
318+
user_id,
319+
subject_patten,
320+
body_patten,
321+
],
322+
offset=offset,
323+
limit=limit,
324+
order_by=order_by,
273325
session=session,
274326
)
327+
275328
hitl_details = session.scalars(hitl_detail_select)
276329
return HITLDetailCollection(
277330
hitl_details=hitl_details,

airflow-core/src/airflow/models/hitl.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ class HITLDetail(Base):
7373
@hybrid_property
7474
def response_received(self) -> bool:
7575
return self.response_at is not None
76+
77+
@response_received.expression # type: ignore[no-redef]
78+
def response_received(cls):
79+
return cls.response_at.is_not(None)

airflow-ctl/src/airflowctl/api/datamodels/generated.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -573,34 +573,6 @@ class FastAPIRootMiddlewareResponse(BaseModel):
573573
name: Annotated[str, Field(title="Name")]
574574

575575

576-
class HITLDetail(BaseModel):
577-
"""
578-
Schema for Human-in-the-loop detail.
579-
"""
580-
581-
ti_id: Annotated[str, Field(title="Ti Id")]
582-
options: Annotated[list[str], Field(title="Options")]
583-
subject: Annotated[str, Field(title="Subject")]
584-
body: Annotated[str | None, Field(title="Body")] = None
585-
defaults: Annotated[list[str] | None, Field(title="Defaults")] = None
586-
multiple: Annotated[bool | None, Field(title="Multiple")] = False
587-
params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
588-
user_id: Annotated[str | None, Field(title="User Id")] = None
589-
response_at: Annotated[datetime | None, Field(title="Response At")] = None
590-
chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None
591-
params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None
592-
response_received: Annotated[bool | None, Field(title="Response Received")] = False
593-
594-
595-
class HITLDetailCollection(BaseModel):
596-
"""
597-
Schema for a collection of Human-in-the-loop details.
598-
"""
599-
600-
hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")]
601-
total_entries: Annotated[int, Field(title="Total Entries")]
602-
603-
604576
class HITLDetailResponse(BaseModel):
605577
"""
606578
Response of updating a Human-in-the-loop detail.
@@ -1829,6 +1801,34 @@ class DagStatsCollectionResponse(BaseModel):
18291801
total_entries: Annotated[int, Field(title="Total Entries")]
18301802

18311803

1804+
class HITLDetail(BaseModel):
1805+
"""
1806+
Schema for Human-in-the-loop detail.
1807+
"""
1808+
1809+
task_instance: TaskInstanceResponse
1810+
options: Annotated[list[str], Field(title="Options")]
1811+
subject: Annotated[str, Field(title="Subject")]
1812+
body: Annotated[str | None, Field(title="Body")] = None
1813+
defaults: Annotated[list[str] | None, Field(title="Defaults")] = None
1814+
multiple: Annotated[bool | None, Field(title="Multiple")] = False
1815+
params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
1816+
user_id: Annotated[str | None, Field(title="User Id")] = None
1817+
response_at: Annotated[datetime | None, Field(title="Response At")] = None
1818+
chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None
1819+
params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None
1820+
response_received: Annotated[bool | None, Field(title="Response Received")] = False
1821+
1822+
1823+
class HITLDetailCollection(BaseModel):
1824+
"""
1825+
Schema for a collection of Human-in-the-loop details.
1826+
"""
1827+
1828+
hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")]
1829+
total_entries: Annotated[int, Field(title="Total Entries")]
1830+
1831+
18321832
class PluginCollectionResponse(BaseModel):
18331833
"""
18341834
Plugin Collection serializer.

scripts/ci/pre_commit/check_ti_vs_tis_attributes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def compare_attributes(path1, path2):
5656
"triggerer_job",
5757
"note",
5858
"rendered_task_instance_fields",
59+
"hitl_detail",
5960
# Storing last heartbeat for historic TIs is not interesting/useful
6061
"last_heartbeat_at",
6162
"id",

0 commit comments

Comments
 (0)