Description
Body
This was previously discussed on dev mailing list here and part of the proposal in this doc.
User Story
As a developer building interactive, data-driven applications,
I need to trigger an Airflow DAG and receive a result value or failure status synchronously,
so I can embed Airflow DAGs in API endpoints, chat agents, or inference services without polling or custom glue code.
Background
Airflow 3.0.0 shipped support for concurrent non–data-interval DAG runs ((1)
from the Synchronous DAG Execution proposal), enabling multiple users to trigger the same DAG independently with logical_date=None
.
This unlocks foundational support for request/response workloads.
To make this pattern fully usable, we now need:
- (2) A way to return a result value from a DAG (via a specific task).
- (3) A way to propagate failure status early (before full DAG teardown completes).
Both are natural extensions of current capabilities and align with the proposal.
Goals
- Allow DAG authors to mark a single task as the “result task”; its output is returned.
- Provide a blocking API to wait for that result or failure.
- Surface failure status early if a critical task fails, without waiting for DAG teardown.
Design Questions
-
How to choose the result task?
DAG author must mark one task; compile-time enforcement or allow passing it via API?. -
Should this be API-only or include DSL changes?
For simplicity, we could expose only an API where the result task is inferred (e.g. final successful task or a named convention). But DSL affordance (@result_task
orresult.task
or a dag level argument) improves clarity, type-checkability, and UI introspection. -
Return full result vs. reference?
Probably whatever is in the DB table only to avoid deserialisation & memory size risk in the API-server -
UI changes?
Optional: tag result task in Graph view; expose link to result.
Milestones
Phase 1 is the main piece, both Phase 2 and Phase 3 are optional and we should figure out if we need it.
Phase 1 (Airflow 3.1.0): API support for synchronous result retrieval
API options:
- Add
wait_for_completion=true
query param to the existing trigger endpoint:
POST /api/v2/dags/{dag_id}/dagRuns?wait_for_completion=true
- Or introduce a separate endpoint to fetch results:
GET /api/v2/dags/{dag_id}/dagRuns/{run_id}/result?timeout=5s
Tradeoffs:
- Single-endpoint (
POST
with blocking) is simpler for clients - Split-endpoint (
POST
+GET
) is more RESTful and better for long polling/retries
Phase 2 (Airflow 3.2.0 or later): DSL for defining result task (optional)
Options:
Ways to specify the result task:
- Decorator or helper:
@result_task
or@task.result
- DAG-level method:
dag.result_from("fetch_answer")
- DAG-level argument:
dag(result_task="fetch_answer")
Phase 3 (Airflow 3.2.0 or later): Early failure propagation (optional)
Return to the invoking API:
- Success: As soon as the result task finishes, without waiting for teardown (if any).
- Failure: As soon as a critical task fails (including
upstream_failed
), without waiting for full DAG completion.