Skip to content

Improve xcom_pull to cover different scenarios for mapped tasks #51568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 12, 2025

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jun 10, 2025

closes: #50686

Problem

The expected behaviour from a downstream task pulling xcoms off of a mapped task would be to pull ALL the xcoms for that mapped task (all map indexes). Right now the problem we are suffering from is the way we treat defaults in our current logic.

Our logic is broken in a way that if map indexes arent specified, we function in a way that we pull from the corresponding map index of upstream task. This is because of the way we handle defaults as of now:

https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L349-L360

Scenarios that show how things work:

image

This is wrong. We should be pulling from all map indexes of the upstream task if not specified.

Approach

Leveraging: #50117 here. If you pass in the start, end, step as None -- the API is designed to return ALL the xcoms for the task, exactly what we want.

So i am currently changing the logic to "if map_indexes aren't provided, fetch all the map_indexes available".

Testing

Test 1: Checking with DAG provided by @TJaniF

DAG:

from airflow.sdk import dag, task, chain


@dag
def custom_xcom_backend_test():

    @task
    def provide_map():
        return [1, 2, 3, 4]

    @task
    def push_xcom(map_num):
        print(map_num)
        return [map_num]*2

    _push_xcom = push_xcom.expand(map_num=provide_map())

    @task
    def pull_xcom_explicit(**context):

        my_xcom_unmapped = context["ti"].xcom_pull(
            dag_id="custom_xcom_backend_test",
            task_ids=["provide_map"],
            key="return_value",
        )
        print("XCom from unmapped task:", my_xcom_unmapped)

        my_xcom = context["ti"].xcom_pull(
            dag_id="custom_xcom_backend_test",
            task_ids=["push_xcom"],
            key="return_value",
        )
        print("XCom from mapped task:", my_xcom)

    chain(
        _push_xcom,
        pull_xcom_explicit(),
    )


custom_xcom_backend_test()

image

Test 2: DAG reported by issue reporter

DAG:

from datetime import datetime
from textwrap import dedent

from airflow import DAG
from airflow.decorators import task
from airflow.providers.standard.operators.bash import BashOperator


@task
def build_something():
    return "Show this"


@task
def build_multiple():
    return [{"message": "Also this"}, {"message": "Also that"}]


with DAG(
    "mve-xcom-jinja",
    default_args={
        "depends_on_past": False,
    },
    schedule=None,
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:
    something = build_something()

    multiple = something >> build_multiple()

    _ = multiple >> BashOperator.partial(
        task_id="show",
        bash_command=dedent(
            """
            echo "{{ task_instance.xcom_pull(task_ids='build_something') }}"
            echo "$message"
            """
        ),
    ).expand(env=multiple)

image

Test 3: Created a comprehensive DAG to cover all use cases

from airflow.sdk import dag, task, chain


@dag
def xcom_pull_test():
    """Focused test DAG for key xcom_pull scenarios."""

    @task
    def provide_map():
        return [1, 2, 3, 4]

    @task
    def unmapped_task():
        return "unmapped_value"

    @task
    def another_unmapped_task():
        return {"key": "another_value"}

    @task
    def mapped_task(map_num):
        return [map_num] * 2  # [1,1], [2,2], [3,3], [4,4]

    # Create mapped task
    _mapped_task = mapped_task.expand(map_num=provide_map())

    @task
    def test_xcom_pull(**context):
        """Test key xcom_pull scenarios."""
        ti = context["ti"]

        print("=== XCOM PULL TESTS ===")

        # 1. Single task ID as string vs list
        print("\n1. SINGLE TASK ID:")
        result1 = ti.xcom_pull(task_ids="unmapped_task")
        result2 = ti.xcom_pull(task_ids=["unmapped_task"])
        print(f"   String: {result1}")
        print(f"   List:   {result2}")

        # 2. Multiple task IDs (unmapped)
        print("\n2. MULTIPLE UNMAPPED TASKS:")
        result = ti.xcom_pull(task_ids=["unmapped_task", "another_unmapped_task"])
        print(f"   Result: {result}")

        # 3. Mapped task WITHOUT map_indexes (NEW: should get all)
        print("\n3. MAPPED TASK - NO MAP_INDEXES:")
        result = ti.xcom_pull(task_ids="mapped_task")
        print(f"   All values: {result}")

        # 4. Mapped task WITH specific map_indexes
        print("\n4. MAPPED TASK - WITH MAP_INDEXES:")
        result1 = ti.xcom_pull(task_ids="mapped_task", map_indexes=1)
        result2 = ti.xcom_pull(task_ids="mapped_task", map_indexes=[0, 2])
        print(f"   Single index (1): {result1}")
        print(f"   Multiple indexes [0,2]: {result2}")

        # 5. Multiple task IDs with mapped task (mixed)
        print("\n5. MULTIPLE TASKS (MIXED):")
        result = ti.xcom_pull(task_ids=["unmapped_task", "mapped_task"])
        print(f"   Mixed: {result}")

        # 6. Multiple mapped tasks
        print("\n6. MULTIPLE MAPPED TASKS:")
        result = ti.xcom_pull(task_ids=["mapped_task"])
        print(f"   Result: {result}")

    chain(
        [provide_map(), unmapped_task(), another_unmapped_task(), _mapped_task],
        test_xcom_pull(),
    )


xcom_pull_test()

This dag handles all possible scenarios:

  1. Single task ID as string: xcom value must be returned
  2. Single task ID in a list: xcom must be in a list
  3. Multiple unmapped task ids: result must be in a list
  4. Mapped task without map indexes: fetch all the xcoms for all map indexes
  5. Mapped task but with map indexes provided: fetch only for those map indexes
  6. Multiple task ids with mapped tasks: get the mixed result in a list
  7. Multiple mapped tasks: get the combination.

image

Logs:

[2025-06-11, 12:59:23] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-11, 12:59:2[3](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#3)] INFO - Filling up the DagBag from /files/dags/comprehensive-xcom-test.py: source="airflow.models.dagbag.DagBag"
[2025-06-11, 12:59:23] INFO - === XCOM PULL TESTS ===: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - : chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - 1. SINGLE TASK ID:: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    String: unmapped_value: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    List:   ['unmapped_value']: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - : chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - 2. MULTIPLE UNMAPPED TASKS:: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    Result: ['unmapped_value', {'key': 'another_value'}]: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - : chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - 3. MAPPED TASK - NO MAP_INDEXES:: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    All values: [[1, 1], [2, 2], [3, 3], [[4](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#4), 4]]: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - : chan="stdout": source="task"
[202[5](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#5)-06-11, 12:59:23] INFO - 4. MAPPED TASK - WITH MAP_INDEXES:: chan="stdout": source="task"
[2025-0[6](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#6)-11, 12:59:23] INFO -    Single index (1): [2, 2]: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    Multiple indexes [0,2]: [[1, 1], [3, 3]]: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - : chan="stdout": source="task"
[2025-06-11, 12:5[9](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#9):23] INFO - 5. MULTIPLE TASKS (MIXED):: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    Mixed: ['unmapped_value', [1, 1], [2, 2], [3, 3], [4, 4]]: chan="stdout": source="task"
[2025-06-[11](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#11), 12:59:23] INFO - : chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-11, [12](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#12):59:23] INFO - 6. MULTIPLE MAPPED TASKS:: chan="stdout": source="task"
[2025-06-11, 12:59:23] INFO -    Result: [[1, 1], [2, 2], [3, 3], [4, 4]]: chan="stdout": source="task"

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh requested a review from uranusjr June 10, 2025 11:03
@amoghrajesh amoghrajesh self-assigned this Jun 11, 2025
@amoghrajesh amoghrajesh changed the title Improve xcom_pull to reflect reality for mapped tasks Improve xcom_pull to cover different scenarios for mapped tasks Jun 11, 2025
@amoghrajesh amoghrajesh marked this pull request as ready for review June 11, 2025 07:30
@amoghrajesh amoghrajesh requested review from ashb and kaxil as code owners June 11, 2025 07:30
@amoghrajesh amoghrajesh requested a review from uranusjr June 11, 2025 08:16
amoghrajesh and others added 2 commits June 11, 2025 15:42
@amoghrajesh amoghrajesh added this to the Airflow 3.1.0 milestone Jun 11, 2025
@amoghrajesh
Copy link
Contributor Author

Thanks for the review, merging it.

@amoghrajesh amoghrajesh merged commit 25f7fe3 into apache:main Jun 12, 2025
70 checks passed
@amoghrajesh amoghrajesh deleted the pull-xcom-mapped-task-from-jinja branch June 12, 2025 03:34
Copy link

boring-cyborg bot commented Jun 12, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Jinja does not find XCom value with dynamic mapping for Airflow 3
2 participants