Skip to content

Displaying non native XComs in a human readable way on UI #51535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 11, 2025

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jun 9, 2025

closes: #49456

Problem

The problem was that we used custom XCom serialisation (if defined) or regular XcomModel serialisation to serialise the xcoms before sending it across to the API server to persist to the DB.

Flow:

  1. XCom is pushed via task return value or ti.xcom_push and then uses the object that is serialised using CustomXComBackend or the XCom class via the XComEncoder (e.g set, Asset, datetime) into Python JSON-serializable object.

    def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length: int | None = None) -> None:

    value = cls.serialize_value(
    value=value,
    key=key,
    task_id=task_id,
    dag_id=dag_id,
    run_id=run_id,
    map_index=map_index,
    )

    from airflow.serialization.serde import serialize
    # return back the value for BaseXCom, custom backends will implement this
    return serialize(value) # type: ignore[return-value]

  2. FastAPI deserializes it (JsonValue) when API call is sent to API server → Converts it into a Python dict.

    self.client.post(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", params=params, json=value)

    value: Annotated[
    JsonValue,
    Body(
    description="A JSON-formatted string representing the value to set for the XCom.",
    openapi_examples={
    "simple_value": {
    "summary": "Simple value",
    "value": '"value1"',
    },
    "dict_value": {
    "summary": "Dictionary value",
    "value": '{"key2": "value2"}',
    },
    "list_value": {
    "summary": "List value",
    "value": '["value1"]',
    },
    },
    ),
    ],

  3. SQLAlchemy stores it (Column(JSON)) → Serializes it back into JSON in the database.

    value = Column(JSON().with_variant(postgresql.JSONB, "postgresql"))

But when de-serializing to show in the UI or API, we don't de-serialize using our Serialization module which happened in (1) above.

Approach

We are certain at all times that the value stored in the DB is going to be a valid JSON object, irrespective of where it got serialised from.

In airflow 3, the custom xcom backend is also only handled on the worker side and the reference for it is stored in the Xcom table in the metadata DB. So, we should not use the custom xcom backend ser / deser on the API server side at all.

A safe way to deserialize would be to pass in the full=False to the serde deserialize so that the deser can happen without loading the custom classes that COULD be present. This will return a stringified version of the object deserialized.

Since we use the "full=False", we will be returning a stringified version of the value when deserialize is set to true. A good way of handling this would be to have the UI not JSON.stringify it if it already is a string to avoid quotes display on the UI.

Testing

DAG used:

from airflow.sdk import ObjectStoragePath
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_class(**kwargs):
    value = ObjectStoragePath("file:///tmp/foo")
    return value

with DAG(
    'xcom_push_class',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='push_class',
        python_callable=push_class,
    )

    push_xcom_task

Request sent by the UI to the execution API:

curl 'http://localhost:28080/api/v2/dags/xcom_push_class/dagRuns/manual__2025-06-10T07:23:01.644733+00:00/taskInstances/push_class/xcomEntries/return_value?map_index=-1&deserialize=true&stringify=false' \
  -H 'Accept: application/json' \
  -H 'Accept-Language: en-GB,en;q=0.9' \
  -H 'Connection: keep-alive' \
  -H 'Referer: http://localhost:28080/dags/xcom_push_class/runs/manual__2025-06-10T07:23:01.644733+00:00/tasks/push_class/xcom' \
  -H 'Sec-Fetch-Dest: empty' \
  -H 'Sec-Fetch-Mode: cors' \
  -H 'Sec-Fetch-Site: same-origin' \
  -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36' \
  -H 'sec-ch-ua: "Google Chrome";v="137", "Chromium";v="137", "Not/A)Brand";v="24"' \
  -H 'sec-ch-ua-mobile: ?0' \
  -H 'sec-ch-ua-platform: "macOS"'

Before:

image

After:

image

TODO:

  • Update PR desc
  • Add testing details
  • Fix tests
  • Update UI code and tests to send "deserialize" flag as true

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added the area:API Airflow's REST/HTTP API label Jun 9, 2025
@amoghrajesh amoghrajesh marked this pull request as draft June 9, 2025 15:03
@amoghrajesh amoghrajesh requested review from kaxil and removed request for bugraoz93, shubhamraj-git and jason810496 June 9, 2025 15:03
@amoghrajesh amoghrajesh changed the title Displaying non native XComs in a human friendly way on UI and API Displaying non native XComs in a human friendly way on UI Jun 10, 2025
@amoghrajesh amoghrajesh marked this pull request as ready for review June 10, 2025 08:19
@amoghrajesh amoghrajesh changed the title Displaying non native XComs in a human friendly way on UI Displaying non native XComs in a human readable way on UI Jun 10, 2025
Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good to me, not approving just to leave time to @kaxil to check this one as he's more familiar with our xcom serialization model.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some tests needs fixing, lgtm otherwise.

cc @uranusjr since he had concerns last time

@kaxil kaxil added this to the Airflow 3.1.0 milestone Jun 10, 2025
@amoghrajesh amoghrajesh requested a review from uranusjr June 11, 2025 05:31
@amoghrajesh amoghrajesh force-pushed the deserialize-xcom-native branch from 467e80d to 3db7b6c Compare June 11, 2025 05:39
Co-authored-by: Tzu-ping Chung <[email protected]>
@amoghrajesh amoghrajesh merged commit 81a2b6c into apache:main Jun 11, 2025
96 checks passed
@amoghrajesh amoghrajesh deleted the deserialize-xcom-native branch June 11, 2025 11:20
@amoghrajesh amoghrajesh self-assigned this Jun 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Display non native xcoms in their native forms on Airflow UI
4 participants