diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index c0b2e81c71815..073e2c9c8a38e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2147,6 +2147,90 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait: + get: + tags: + - DagRun + - experimental + summary: 'Experimental: Wait for a dag run to complete, and return task results + if requested.' + description: "\U0001F6A7 This is an experimental endpoint and may change or\ + \ be removed without notice." + operationId: wait_dag_run_until_finished + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: interval + in: query + required: true + schema: + type: number + exclusiveMinimum: 0.0 + description: Seconds to wait between dag run state checks + title: Interval + description: Seconds to wait between dag run state checks + - name: result + in: query + required: false + schema: + anyOf: + - type: array + items: + type: string + - type: 'null' + description: Collect result XCom from task. Can be set multiple times. + title: Result + description: Collect result XCom from task. Can be set multiple times. + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + application/x-ndjson: + schema: + type: string + example: '{"state": "running"} + + {"state": "success", "results": {"op": 42}} + + ' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/list: post: tags: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 8f9b11b3de78a..5d974ec012cd5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -17,11 +17,13 @@ from __future__ import annotations +import textwrap from typing import Annotated, Literal, cast import structlog from fastapi import Depends, HTTPException, Query, status from fastapi.exceptions import RequestValidationError +from fastapi.responses import StreamingResponse from pydantic import ValidationError from sqlalchemy import select from sqlalchemy.orm import joinedload @@ -51,6 +53,7 @@ search_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse from airflow.api_fastapi.core_api.datamodels.dag_run import ( DAGRunClearBody, @@ -72,6 +75,7 @@ requires_access_asset, requires_access_dag, ) +from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager @@ -438,6 +442,57 @@ def trigger_dag_run( raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) +@dag_run_router.get( + "/{dag_run_id}/wait", + tags=["experimental"], + summary="Experimental: Wait for a dag run to complete, and return task results if requested.", + description="🚧 This is an experimental endpoint and may change or be removed without notice.", + responses={ + **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + status.HTTP_200_OK: { + "description": "Successful Response", + "content": { + Mimetype.NDJSON: { + "schema": { + "type": "string", + "example": textwrap.dedent( + """\ + {"state": "running"} + {"state": "success", "results": {"op": 42}} + """ + ), + } + } + }, + }, + }, + dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN))], +) +def wait_dag_run_until_finished( + dag_id: str, + dag_run_id: str, + session: SessionDep, + interval: Annotated[float, Query(gt=0.0, description="Seconds to wait between dag run state checks")], + result_task_ids: Annotated[ + list[str] | None, + Query(alias="result", description="Collect result XCom from task. Can be set multiple times."), + ] = None, +): + "Wait for a dag run until it finishes, and return its result(s)." + if not session.scalar(select(1).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)): + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + ) + waiter = DagRunWaiter( + dag_id=dag_id, + run_id=dag_run_id, + interval=interval, + result_task_ids=result_task_ids, + ) + return StreamingResponse(waiter.wait()) + + @dag_run_router.post( "/list", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py new file mode 100644 index 0000000000000..259389e799494 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import asyncio +import itertools +import json +import operator +from typing import TYPE_CHECKING, Any + +import attrs +from sqlalchemy import select + +from airflow.models.dagrun import DagRun +from airflow.models.xcom import XCOM_RETURN_KEY, XComModel +from airflow.utils.session import create_session_async +from airflow.utils.state import State + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Iterator + + +@attrs.define +class DagRunWaiter: + """Wait for the specified dag run to finish, and collect info from it.""" + + dag_id: str + run_id: str + interval: float + result_task_ids: list[str] | None + + async def _get_dag_run(self) -> DagRun: + async with create_session_async() as session: + return await session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id)) + + def _serialize_xcoms(self) -> dict[str, Any]: + xcom_query = XComModel.get_many( + run_id=self.run_id, + key=XCOM_RETURN_KEY, + task_ids=self.result_task_ids, + dag_ids=self.dag_id, + ) + xcom_query = xcom_query.order_by(XComModel.task_id, XComModel.map_index) + + def _group_xcoms(g: Iterator[XComModel]) -> Any: + entries = list(g) + if len(entries) == 1 and entries[0].map_index < 0: # Unpack non-mapped task xcom. + return entries[0].value + return [entry.value for entry in entries] # Task is mapped; return all xcoms in a list. + + return { + task_id: _group_xcoms(g) + for task_id, g in itertools.groupby(xcom_query, key=operator.attrgetter("task_id")) + } + + def _serialize_response(self, dag_run: DagRun) -> str: + resp = {"state": dag_run.state} + if dag_run.state not in State.finished_dr_states: + return json.dumps(resp) + if self.result_task_ids: + resp["results"] = self._serialize_xcoms() + return json.dumps(resp) + + async def wait(self) -> AsyncGenerator[str, None]: + yield self._serialize_response(dag_run := await self._get_dag_run()) + yield "\n" + while dag_run.state not in State.finished_dr_states: + await asyncio.sleep(self.interval) + yield self._serialize_response(dag_run := await self._get_dag_run()) + yield "\n" diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index 689ce2e4e6819..ca8de146775e2 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -22,7 +22,6 @@ import json import logging import os -import platform import sys import warnings from collections.abc import Callable @@ -321,6 +320,20 @@ def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: return True +def _configure_async_session(): + global async_engine + global AsyncSession + + async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True) + AsyncSession = sessionmaker( + bind=async_engine, + autocommit=False, + autoflush=False, + class_=SAAsyncSession, + expire_on_commit=False, + ) + + def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy.""" from airflow.sdk.execution_time.secrets_masker import mask_secret @@ -335,8 +348,6 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global Session global engine - global async_engine - global AsyncSession global NonScopedSession if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": @@ -359,34 +370,24 @@ def configure_orm(disable_connection_pool=False, pool_class=None): connect_args["check_same_thread"] = False engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) - async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True) - AsyncSession = sessionmaker( - bind=async_engine, - autocommit=False, - autoflush=False, - class_=SAAsyncSession, - expire_on_commit=False, - ) mask_secret(engine.url.password) - setup_event_handlers(engine) if conf.has_option("database", "sql_alchemy_session_maker"): _session_maker = conf.getimport("database", "sql_alchemy_session_maker") else: - - def _session_maker(_engine): - return sessionmaker( - autocommit=False, - autoflush=False, - bind=_engine, - expire_on_commit=False, - ) - + _session_maker = functools.partial( + sessionmaker, + autocommit=False, + autoflush=False, + expire_on_commit=False, + ) NonScopedSession = _session_maker(engine) Session = scoped_session(NonScopedSession) - if not platform.system() == "Windows": + _configure_async_session() + + if register_at_fork := getattr(os, "register_at_fork", None): # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork def clean_in_fork(): _globals = globals() @@ -396,7 +397,7 @@ def clean_in_fork(): async_engine.sync_engine.dispose(close=False) # Won't work on Windows - os.register_at_fork(after_in_child=clean_in_fork) + register_at_fork(after_in_child=clean_in_fork) DEFAULT_ENGINE_ARGS = { diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 4c76141a70d93..8410c0ba4748b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseQueryResult } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; export type AssetServiceGetAssetsDefaultResponse = Awaited>; export type AssetServiceGetAssetsQueryResult = UseQueryResult; @@ -159,6 +159,24 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, endDateGte, endDateLte, updatedAtGte?: string; updatedAtLte?: string; }, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }])]; +export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; +export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; +export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; +export const UseDagRunServiceWaitDagRunUntilFinishedKeyFn = ({ dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}, queryKey?: Array) => [useDagRunServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ dagId, dagRunId, interval, result }])]; +export type ExperimentalServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; +export type ExperimentalServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; +export const useExperimentalServiceWaitDagRunUntilFinishedKey = "ExperimentalServiceWaitDagRunUntilFinished"; +export const UseExperimentalServiceWaitDagRunUntilFinishedKeyFn = ({ dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}, queryKey?: Array) => [useExperimentalServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ dagId, dagRunId, interval, result }])]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited>; export type DagSourceServiceGetDagSourceQueryResult = UseQueryResult; export const useDagSourceServiceGetDagSourceKey = "DagSourceServiceGetDagSource"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 99e3a1f56aaac..7ccd7ca1a9de4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,6 +296,40 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtLte?: string; }) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); +/** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const ensureUseExperimentalServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}) => queryClient.ensureQueryData({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index aff2296830eda..60fc346fdc62a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,6 +296,40 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d updatedAtLte?: string; }) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); +/** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const prefetchUseExperimentalServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}) => queryClient.prefetchQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 86294553c8dd2..d2a6abe0a6b22 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,6 +296,40 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useDagRunServiceWaitDagRunUntilFinished = = unknown[]>({ dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); +/** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useExperimentalServiceWaitDagRunUntilFinished = = unknown[]>({ dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 6ce893418107b..a8b84a5a173a3 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,6 +296,40 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useDagRunServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); +/** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.result Collect result XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useExperimentalServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ dagId, dagRunId, interval, result }: { + dagId: string; + dagRunId: string; + interval: number; + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index d3dc37b8e662d..ba77ae0668075 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; export class AssetService { /** @@ -1050,6 +1050,38 @@ export class DagRunService { }); } + /** + * Experimental: Wait for a dag run to complete, and return task results if requested. + * 🚧 This is an experimental endpoint and may change or be removed without notice. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.interval Seconds to wait between dag run state checks + * @param data.result Collect result XCom from task. Can be set multiple times. + * @returns unknown Successful Response + * @throws ApiError + */ + public static waitDagRunUntilFinished(data: WaitDagRunUntilFinishedData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId + }, + query: { + interval: data.interval, + result: data.result + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Get List Dag Runs Batch * Get a list of DAG Runs. @@ -1079,6 +1111,41 @@ export class DagRunService { } +export class ExperimentalService { + /** + * Experimental: Wait for a dag run to complete, and return task results if requested. + * 🚧 This is an experimental endpoint and may change or be removed without notice. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.interval Seconds to wait between dag run state checks + * @param data.result Collect result XCom from task. Can be set multiple times. + * @returns unknown Successful Response + * @throws ApiError + */ + public static waitDagRunUntilFinished(data: WaitDagRunUntilFinishedData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId + }, + query: { + interval: data.interval, + result: data.result + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + +} + export class DagSourceService { /** * Get Dag Source diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index b1590d732fe23..80b644ce63f73 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2216,6 +2216,21 @@ export type TriggerDagRunData = { export type TriggerDagRunResponse = DAGRunResponse; +export type WaitDagRunUntilFinishedData = { + dagId: string; + dagRunId: string; + /** + * Seconds to wait between dag run state checks + */ + interval: number; + /** + * Collect result XCom from task. Can be set multiple times. + */ + result?: Array<(string)> | null; +}; + +export type WaitDagRunUntilFinishedResponse = unknown; + export type GetListDagRunsBatchData = { dagId: "~"; requestBody: DAGRunsBatchBody; @@ -3979,6 +3994,33 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait': { + get: { + req: WaitDagRunUntilFinishedData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dags/{dag_id}/dagRuns/list': { post: { req: GetListDagRunsBatchData; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index acb054cb3f9a6..6ce599e0c675b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -104,12 +104,12 @@ def setup(request, dag_maker, session=None): dag_run1.note = (DAG1_RUN1_NOTE, "not_test") - for task in [task1, task2]: + for i, task in enumerate([task1, task2], start=1): ti = dag_run1.get_task_instance(task_id=task.task_id) ti.task = task ti.state = State.SUCCESS - session.merge(ti) + ti.xcom_push("return_value", f"result_{i}") dag_run2 = dag_maker.create_dagrun( run_id=DAG1_RUN2_ID, @@ -1621,3 +1621,57 @@ def test_should_respond_200_with_null_logical_date(self, test_client): "conf": {}, "note": None, } + + +class TestWaitDagRun: + # The way we init async engine does not work well with FastAPI app init. + # Creating the engine implicitly creates an event loop, which Airflow does + # once for the entire process; creating the FastAPI app also does, but our + # test setup does it once for each test. I don't know how to properly fix + # this without rewriting how Airflow does db; re-configuring the db for each + # test at least makes the tests run correctly. + @pytest.fixture(autouse=True) + def reconfigure_async_db_engine(self): + from airflow.settings import _configure_async_session + + _configure_async_session() + + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", + params={"interval": "1"}, + ) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", + params={"interval": "1"}, + ) + assert response.status_code == 403 + + def test_should_respond_404(self, test_client): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/wait", params={"interval": "1"}) + assert response.status_code == 404 + + def test_should_respond_422_without_interval_param(self, test_client): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait") + assert response.status_code == 422 + + @pytest.mark.parametrize( + "run_id, state", + [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)], + ) + def test_should_respond_200_immediately_for_finished_run(self, test_client, run_id, state): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait", params={"interval": "100"}) + assert response.status_code == 200 + data = response.json() + assert data == {"state": state} + + def test_collect_task(self, test_client): + response = test_client.get( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", params={"interval": "1", "result": "task_1"} + ) + assert response.status_code == 200 + data = response.json() + assert data == {"state": DagRunState.SUCCESS, "results": {"task_1": '"result_1"'}}