Skip to content

Commit 62dcda1

Browse files
guan404mingsanederchik
authored andcommitted
Add TI bulk actions endpoint (apache#50443)
1 parent 42d558e commit 62dcda1

File tree

11 files changed

+1247
-55
lines changed

11 files changed

+1247
-55
lines changed

airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,10 @@ def validate_new_state(cls, ns: str | None) -> str:
224224
if ns not in valid_states:
225225
raise ValueError(f"'{ns}' is not one of {valid_states}")
226226
return ns
227+
228+
229+
class BulkTaskInstanceBody(PatchTaskInstanceBody, StrictBaseModel):
230+
"""Request body for bulk update, and delete task instances."""
231+
232+
task_id: str
233+
map_index: int | None = None

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5568,6 +5568,58 @@ paths:
55685568
application/json:
55695569
schema:
55705570
$ref: '#/components/schemas/HTTPValidationError'
5571+
patch:
5572+
tags:
5573+
- Task Instance
5574+
summary: Bulk Task Instances
5575+
description: Bulk update, and delete task instances.
5576+
operationId: bulk_task_instances
5577+
security:
5578+
- OAuth2PasswordBearer: []
5579+
parameters:
5580+
- name: dag_id
5581+
in: path
5582+
required: true
5583+
schema:
5584+
type: string
5585+
title: Dag Id
5586+
- name: dag_run_id
5587+
in: path
5588+
required: true
5589+
schema:
5590+
type: string
5591+
title: Dag Run Id
5592+
requestBody:
5593+
required: true
5594+
content:
5595+
application/json:
5596+
schema:
5597+
$ref: '#/components/schemas/BulkBody_BulkTaskInstanceBody_'
5598+
responses:
5599+
'200':
5600+
description: Successful Response
5601+
content:
5602+
application/json:
5603+
schema:
5604+
$ref: '#/components/schemas/BulkResponse'
5605+
'401':
5606+
content:
5607+
application/json:
5608+
schema:
5609+
$ref: '#/components/schemas/HTTPExceptionResponse'
5610+
description: Unauthorized
5611+
'403':
5612+
content:
5613+
application/json:
5614+
schema:
5615+
$ref: '#/components/schemas/HTTPExceptionResponse'
5616+
description: Forbidden
5617+
'422':
5618+
description: Validation Error
5619+
content:
5620+
application/json:
5621+
schema:
5622+
$ref: '#/components/schemas/HTTPValidationError'
55715623
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/list:
55725624
post:
55735625
tags:
@@ -7389,6 +7441,21 @@ components:
73897441
73907442
This structure helps users understand which key actions succeeded and which
73917443
failed.'
7444+
BulkBody_BulkTaskInstanceBody_:
7445+
properties:
7446+
actions:
7447+
items:
7448+
oneOf:
7449+
- $ref: '#/components/schemas/BulkCreateAction_BulkTaskInstanceBody_'
7450+
- $ref: '#/components/schemas/BulkUpdateAction_BulkTaskInstanceBody_'
7451+
- $ref: '#/components/schemas/BulkDeleteAction_BulkTaskInstanceBody_'
7452+
type: array
7453+
title: Actions
7454+
additionalProperties: false
7455+
type: object
7456+
required:
7457+
- actions
7458+
title: BulkBody[BulkTaskInstanceBody]
73927459
BulkBody_ConnectionBody_:
73937460
properties:
73947461
actions:
@@ -7434,6 +7501,28 @@ components:
74347501
required:
74357502
- actions
74367503
title: BulkBody[VariableBody]
7504+
BulkCreateAction_BulkTaskInstanceBody_:
7505+
properties:
7506+
action:
7507+
type: string
7508+
const: create
7509+
title: Action
7510+
description: The action to be performed on the entities.
7511+
entities:
7512+
items:
7513+
$ref: '#/components/schemas/BulkTaskInstanceBody'
7514+
type: array
7515+
title: Entities
7516+
description: A list of entities to be created.
7517+
action_on_existence:
7518+
$ref: '#/components/schemas/BulkActionOnExistence'
7519+
default: fail
7520+
additionalProperties: false
7521+
type: object
7522+
required:
7523+
- action
7524+
- entities
7525+
title: BulkCreateAction[BulkTaskInstanceBody]
74377526
BulkCreateAction_ConnectionBody_:
74387527
properties:
74397528
action:
@@ -7500,6 +7589,28 @@ components:
75007589
- action
75017590
- entities
75027591
title: BulkCreateAction[VariableBody]
7592+
BulkDeleteAction_BulkTaskInstanceBody_:
7593+
properties:
7594+
action:
7595+
type: string
7596+
const: delete
7597+
title: Action
7598+
description: The action to be performed on the entities.
7599+
entities:
7600+
items:
7601+
type: string
7602+
type: array
7603+
title: Entities
7604+
description: A list of entity id/key to be deleted.
7605+
action_on_non_existence:
7606+
$ref: '#/components/schemas/BulkActionNotOnExistence'
7607+
default: fail
7608+
additionalProperties: false
7609+
type: object
7610+
required:
7611+
- action
7612+
- entities
7613+
title: BulkDeleteAction[BulkTaskInstanceBody]
75037614
BulkDeleteAction_ConnectionBody_:
75047615
properties:
75057616
action:
@@ -7599,6 +7710,70 @@ components:
75997710
76007711
Fields are populated in the response only if the respective action was part
76017712
of the request, else are set None.'
7713+
BulkTaskInstanceBody:
7714+
properties:
7715+
new_state:
7716+
anyOf:
7717+
- $ref: '#/components/schemas/TaskInstanceState'
7718+
- type: 'null'
7719+
note:
7720+
anyOf:
7721+
- type: string
7722+
maxLength: 1000
7723+
- type: 'null'
7724+
title: Note
7725+
include_upstream:
7726+
type: boolean
7727+
title: Include Upstream
7728+
default: false
7729+
include_downstream:
7730+
type: boolean
7731+
title: Include Downstream
7732+
default: false
7733+
include_future:
7734+
type: boolean
7735+
title: Include Future
7736+
default: false
7737+
include_past:
7738+
type: boolean
7739+
title: Include Past
7740+
default: false
7741+
task_id:
7742+
type: string
7743+
title: Task Id
7744+
map_index:
7745+
anyOf:
7746+
- type: integer
7747+
- type: 'null'
7748+
title: Map Index
7749+
additionalProperties: false
7750+
type: object
7751+
required:
7752+
- task_id
7753+
title: BulkTaskInstanceBody
7754+
description: Request body for bulk update, and delete task instances.
7755+
BulkUpdateAction_BulkTaskInstanceBody_:
7756+
properties:
7757+
action:
7758+
type: string
7759+
const: update
7760+
title: Action
7761+
description: The action to be performed on the entities.
7762+
entities:
7763+
items:
7764+
$ref: '#/components/schemas/BulkTaskInstanceBody'
7765+
type: array
7766+
title: Entities
7767+
description: A list of entities to be updated.
7768+
action_on_non_existence:
7769+
$ref: '#/components/schemas/BulkActionNotOnExistence'
7770+
default: fail
7771+
additionalProperties: false
7772+
type: object
7773+
required:
7774+
- action
7775+
- entities
7776+
title: BulkUpdateAction[BulkTaskInstanceBody]
76027777
BulkUpdateAction_ConnectionBody_:
76037778
properties:
76047779
action:

airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 24 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import structlog
2323
from fastapi import Depends, HTTPException, Query, status
24-
from fastapi.exceptions import RequestValidationError
25-
from pydantic import ValidationError
2624
from sqlalchemy import or_, select
2725
from sqlalchemy.orm import joinedload
2826
from sqlalchemy.sql.selectable import Select
@@ -51,7 +49,9 @@
5149
float_range_filter_factory,
5250
)
5351
from airflow.api_fastapi.common.router import AirflowRouter
52+
from airflow.api_fastapi.core_api.datamodels.common import BulkBody, BulkResponse
5453
from airflow.api_fastapi.core_api.datamodels.task_instances import (
54+
BulkTaskInstanceBody,
5555
ClearTaskInstancesBody,
5656
PatchTaskInstanceBody,
5757
TaskDependencyCollectionResponse,
@@ -63,11 +63,14 @@
6363
)
6464
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
6565
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
66+
from airflow.api_fastapi.core_api.services.public.task_instances import (
67+
BulkTaskInstanceService,
68+
_patch_ti_validate_request,
69+
)
6670
from airflow.api_fastapi.logging.decorators import action_logging
6771
from airflow.exceptions import TaskNotFound
6872
from airflow.listeners.listener import get_listener_manager
6973
from airflow.models import Base, DagRun
70-
from airflow.models.dag import DAG
7174
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
7275
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
7376
from airflow.ti_deps.dep_context import DepContext
@@ -728,54 +731,6 @@ def post_clear_task_instances(
728731
)
729732

730733

731-
def _patch_ti_validate_request(
732-
dag_id: str,
733-
dag_run_id: str,
734-
task_id: str,
735-
dag_bag: DagBagDep,
736-
body: PatchTaskInstanceBody,
737-
session: SessionDep,
738-
map_index: int | None = -1,
739-
update_mask: list[str] | None = Query(None),
740-
) -> tuple[DAG, list[TI], dict]:
741-
dag = dag_bag.get_dag(dag_id)
742-
if not dag:
743-
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not found")
744-
745-
if not dag.has_task(task_id):
746-
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task '{task_id}' not found in DAG '{dag_id}'")
747-
748-
query = (
749-
select(TI)
750-
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id)
751-
.join(TI.dag_run)
752-
.options(joinedload(TI.rendered_task_instance_fields))
753-
)
754-
if map_index is not None:
755-
query = query.where(TI.map_index == map_index)
756-
else:
757-
query = query.order_by(TI.map_index)
758-
759-
tis = session.scalars(query).all()
760-
761-
err_msg_404 = (
762-
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found",
763-
)
764-
if len(tis) == 0:
765-
raise HTTPException(status.HTTP_404_NOT_FOUND, err_msg_404)
766-
767-
fields_to_update = body.model_fields_set
768-
if update_mask:
769-
fields_to_update = fields_to_update.intersection(update_mask)
770-
else:
771-
try:
772-
PatchTaskInstanceBody.model_validate(body)
773-
except ValidationError as e:
774-
raise RequestValidationError(errors=e.errors())
775-
776-
return dag, list(tis), body.model_dump(include=fields_to_update, by_alias=True)
777-
778-
779734
@task_instances_router.patch(
780735
task_instances_prefix + "/{task_id}/dry_run",
781736
responses=create_openapi_http_exception_doc(
@@ -835,6 +790,24 @@ def patch_task_instance_dry_run(
835790
)
836791

837792

793+
@task_instances_router.patch(
794+
task_instances_prefix,
795+
dependencies=[Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE))],
796+
)
797+
def bulk_task_instances(
798+
request: BulkBody[BulkTaskInstanceBody],
799+
session: SessionDep,
800+
dag_id: str,
801+
dag_bag: DagBagDep,
802+
dag_run_id: str,
803+
user: GetUserDep,
804+
) -> BulkResponse:
805+
"""Bulk update, and delete task instances."""
806+
return BulkTaskInstanceService(
807+
session=session, request=request, dag_id=dag_id, dag_run_id=dag_run_id, dag_bag=dag_bag, user=user
808+
).handle_request()
809+
810+
838811
@task_instances_router.patch(
839812
task_instances_prefix + "/{task_id}",
840813
responses=create_openapi_http_exception_doc(

0 commit comments

Comments
 (0)