Skip to content

Enable Inference Execution / Synchronous DAG Execution #51711

Open
0 of 1 issue completed
Open
Feature
0 of 1 issue completed
@kaxil

Description

@kaxil

Body

This was previously discussed on dev mailing list here and part of the proposal in this doc.

User Story

As a developer building interactive, data-driven applications,
I need to trigger an Airflow DAG and receive a result value or failure status synchronously,
so I can embed Airflow DAGs in API endpoints, chat agents, or inference services without polling or custom glue code.

Background

Airflow 3.0.0 shipped support for concurrent non–data-interval DAG runs ((1) from the Synchronous DAG Execution proposal), enabling multiple users to trigger the same DAG independently with logical_date=None.

This unlocks foundational support for request/response workloads.
To make this pattern fully usable, we now need:

  • (2) A way to return a result value from a DAG (via a specific task).
  • (3) A way to propagate failure status early (before full DAG teardown completes).

Both are natural extensions of current capabilities and align with the proposal.

Goals

  • Allow DAG authors to mark a single task as the “result task”; its output is returned.
  • Provide a blocking API to wait for that result or failure.
  • Surface failure status early if a critical task fails, without waiting for DAG teardown.

Design Questions

  1. How to choose the result task?
    DAG author must mark one task; compile-time enforcement or allow passing it via API?.

  2. Should this be API-only or include DSL changes?
    For simplicity, we could expose only an API where the result task is inferred (e.g. final successful task or a named convention). But DSL affordance (@result_task or result.task or a dag level argument) improves clarity, type-checkability, and UI introspection.

  3. Return full result vs. reference?
    Probably whatever is in the DB table only to avoid deserialisation & memory size risk in the API-server

  4. UI changes?
    Optional: tag result task in Graph view; expose link to result.

Milestones

Phase 1 is the main piece, both Phase 2 and Phase 3 are optional and we should figure out if we need it.

Phase 1 (Airflow 3.1.0): API support for synchronous result retrieval

API options:

  • Add wait_for_completion=true query param to the existing trigger endpoint:
    POST /api/v2/dags/{dag_id}/dagRuns?wait_for_completion=true
  • Or introduce a separate endpoint to fetch results:
    GET /api/v2/dags/{dag_id}/dagRuns/{run_id}/result?timeout=5s

Tradeoffs:

  • Single-endpoint (POST with blocking) is simpler for clients
  • Split-endpoint (POST + GET) is more RESTful and better for long polling/retries

Phase 2 (Airflow 3.2.0 or later): DSL for defining result task (optional)

Options:
Ways to specify the result task:

  • Decorator or helper:
    @result_task or @task.result
  • DAG-level method:
    dag.result_from("fetch_answer")
  • DAG-level argument:
    dag(result_task="fetch_answer")

Phase 3 (Airflow 3.2.0 or later): Early failure propagation (optional)

Return to the invoking API:

  • Success: As soon as the result task finishes, without waiting for teardown (if any).
  • Failure: As soon as a critical task fails (including upstream_failed), without waiting for full DAG completion.

Sub-issues

Metadata

Metadata

Assignees

Labels

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions