-
Notifications
You must be signed in to change notification settings - Fork 15.2k
Displaying non native XComs in a human readable way on UI #51535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good to me, not approving just to leave time to @kaxil to check this one as he's more familiar with our xcom serialization model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some tests needs fixing, lgtm otherwise.
cc @uranusjr since he had concerns last time
467e80d
to
3db7b6c
Compare
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Tzu-ping Chung <[email protected]>
closes: #49456
Problem
The problem was that we used custom XCom serialisation (if defined) or regular XcomModel serialisation to serialise the xcoms before sending it across to the API server to persist to the DB.
Flow:
XCom is pushed via task return value or
ti.xcom_push
and then uses the object that is serialised usingCustomXComBackend
or the XCom class via theXComEncoder
(e.g set, Asset, datetime) into Python JSON-serializable object.airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Line 526 in 3208bbb
airflow/task-sdk/src/airflow/sdk/bases/xcom.py
Lines 64 to 71 in 3208bbb
airflow/task-sdk/src/airflow/sdk/bases/xcom.py
Lines 280 to 283 in 3208bbb
FastAPI deserializes it (JsonValue) when API call is sent to API server → Converts it into a Python dict.
airflow/task-sdk/src/airflow/sdk/api/client.py
Line 414 in 3208bbb
airflow/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
Lines 204 to 223 in 3208bbb
SQLAlchemy stores it (Column(JSON)) → Serializes it back into JSON in the database.
airflow/airflow-core/src/airflow/models/xcom.py
Line 79 in 3208bbb
But when de-serializing to show in the UI or API, we don't de-serialize using our Serialization module which happened in (1) above.
Approach
We are certain at all times that the value stored in the DB is going to be a valid JSON object, irrespective of where it got serialised from.
In airflow 3, the custom xcom backend is also only handled on the worker side and the reference for it is stored in the Xcom table in the metadata DB. So, we should not use the custom xcom backend ser / deser on the API server side at all.
A safe way to deserialize would be to pass in the
full=False
to the serde deserialize so that the deser can happen without loading the custom classes that COULD be present. This will return a stringified version of the object deserialized.Since we use the "full=False", we will be returning a stringified version of the value when deserialize is set to true. A good way of handling this would be to have the UI not
JSON.stringify
it if it already is a string to avoid quotes display on the UI.Testing
DAG used:
Request sent by the UI to the execution API:
Before:
After:
TODO:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.