Skip to content

Commit 81a2b6c

Browse files
authored
Displaying non native XComs in a human readable way on UI (#51535)
1 parent 39983ac commit 81a2b6c

File tree

3 files changed

+44
-66
lines changed

3 files changed

+44
-66
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
from airflow.exceptions import TaskNotFound
4242
from airflow.models import DAG, DagRun as DR
4343
from airflow.models.xcom import XComModel
44-
from airflow.settings import conf
4544

4645
xcom_router = AirflowRouter(
4746
tags=["XCom"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries"
@@ -69,41 +68,41 @@ def get_xcom_entry(
6968
stringify: Annotated[bool, Query()] = False,
7069
) -> XComResponseNative | XComResponseString:
7170
"""Get an XCom entry."""
72-
if deserialize:
73-
if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False):
74-
raise HTTPException(
75-
status.HTTP_400_BAD_REQUEST, "XCom deserialization is disabled in configuration."
76-
)
77-
query = select(XComModel, XComModel.value)
78-
else:
79-
query = select(XComModel)
80-
81-
query = query.where(
82-
XComModel.dag_id == dag_id,
83-
XComModel.task_id == task_id,
84-
XComModel.key == xcom_key,
85-
XComModel.map_index == map_index,
71+
xcom_query = XComModel.get_many(
72+
run_id=dag_run_id,
73+
key=xcom_key,
74+
task_ids=task_id,
75+
dag_ids=dag_id,
76+
map_indexes=map_index,
77+
session=session,
78+
limit=1,
8679
)
87-
query = query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id))
88-
query = query.where(DR.run_id == dag_run_id)
89-
query = query.options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))
9080

91-
if deserialize:
92-
item = session.execute(query).one_or_none()
93-
else:
94-
item = session.scalars(query).one_or_none()
81+
# We use `BaseXCom.get_many` to fetch XComs directly from the database, bypassing the XCom Backend.
82+
# This avoids deserialization via the backend (e.g., from a remote storage like S3) and instead
83+
# retrieves the raw serialized value from the database.
84+
result = xcom_query.limit(1).first()
9585

96-
if item is None:
86+
if result is None:
9787
raise HTTPException(status.HTTP_404_NOT_FOUND, f"XCom entry with key: `{xcom_key}` not found")
9888

89+
item = copy.copy(result)
90+
9991
if deserialize:
100-
from airflow.sdk.execution_time.xcom import XCom
92+
# We use `airflow.serialization.serde` for deserialization here because custom XCom backends (with their own
93+
# serializers/deserializers) are only used on the worker side during task execution.
94+
95+
# However, the XCom value is *always* stored in the metadata database as a valid JSON object.
96+
# Therefore, for purposes such as UI display or returning API responses, deserializing with
97+
# `airflow.serialization.serde` is safe and recommended.
98+
from airflow.serialization.serde import deserialize as serde_deserialize
10199

102-
xcom, value = item
103-
xcom_stub = copy.copy(xcom)
104-
xcom_stub.value = value
105-
xcom_stub.value = XCom.deserialize_value(xcom_stub)
106-
item = xcom_stub
100+
# full=False ensures that the `item` is deserialized without loading the classes, and it returns a stringified version
101+
item.value = serde_deserialize(XComModel.deserialize_value(item), full=False)
102+
else:
103+
# For native format, return the raw serialized value from the database
104+
# This preserves the JSON string format that the API expects
105+
item.value = result.value
107106

108107
if stringify:
109108
return XComResponseString.model_validate(item)

airflow-core/src/airflow/ui/src/pages/XCom/XComEntry.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,16 @@ export const XComEntry = ({ dagId, mapIndex, runId, taskId, xcomKey }: XComEntry
3535
const { data, isLoading } = useXcomServiceGetXcomEntry<XComResponseNative>({
3636
dagId,
3737
dagRunId: runId,
38+
deserialize: true,
3839
mapIndex,
3940
stringify: false,
4041
taskId,
4142
xcomKey,
4243
});
43-
const valueFormatted = JSON.stringify(data?.value, undefined, 4);
44+
// When deserialize=true, the API returns a stringified representation
45+
// so we don't need to JSON.stringify it again
46+
const valueFormatted =
47+
typeof data?.value === "string" ? data.value : JSON.stringify(data?.value, undefined, 4);
4448

4549
return isLoading ? (
4650
<Skeleton

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -154,63 +154,38 @@ def test_should_raise_404_for_non_existent_xcom(self, test_client):
154154
assert response.json()["detail"] == f"XCom entry with key: `{TEST_XCOM_KEY_2}` not found"
155155

156156
@pytest.mark.parametrize(
157-
"support_deserialize, params, expected_status_or_value",
157+
"params, expected_value",
158158
[
159159
pytest.param(
160-
True,
161160
{"deserialize": True},
162-
f"real deserialized {TEST_XCOM_VALUE_AS_JSON}",
163-
id="enabled deserialize-true",
161+
TEST_XCOM_VALUE,
162+
id="deserialize=true",
164163
),
165164
pytest.param(
166-
False,
167-
{"deserialize": True},
168-
400,
169-
id="disabled deserialize-true",
170-
),
171-
pytest.param(
172-
True,
173-
{"deserialize": False},
174-
f"{TEST_XCOM_VALUE_AS_JSON}",
175-
id="enabled deserialize-false",
176-
),
177-
pytest.param(
178-
False,
179165
{"deserialize": False},
180166
f"{TEST_XCOM_VALUE_AS_JSON}",
181-
id="disabled deserialize-false",
167+
id="deserialize=false",
182168
),
183169
pytest.param(
184-
True,
185170
{},
186171
f"{TEST_XCOM_VALUE_AS_JSON}",
187-
id="enabled default",
188-
),
189-
pytest.param(
190-
False,
191-
{},
192-
f"{TEST_XCOM_VALUE_AS_JSON}",
193-
id="disabled default",
172+
id="default",
194173
),
195174
],
196175
)
197176
@conf_vars({("core", "xcom_backend"): "unit.api_fastapi.core_api.routes.public.test_xcom.CustomXCom"})
198-
def test_custom_xcom_deserialize(
199-
self, support_deserialize: bool, params: str, expected_status_or_value: int | str, test_client
200-
):
177+
def test_custom_xcom_deserialize(self, params: str, expected_value: int | str, test_client):
178+
# Even with a CustomXCom defined, we should not be using it during deserialization because API / UI doesn't integrate their
179+
# deserialization with custom backends
201180
XCom = resolve_xcom_backend()
202181
self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom)
203182

204183
url = f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}"
205184
with mock.patch("airflow.sdk.execution_time.xcom.XCom", XCom):
206-
with conf_vars({("api", "enable_xcom_deserialize_support"): str(support_deserialize)}):
207-
response = test_client.get(url, params=params)
185+
response = test_client.get(url, params=params)
208186

209-
if isinstance(expected_status_or_value, int):
210-
assert response.status_code == expected_status_or_value
211-
else:
212-
assert response.status_code == 200
213-
assert response.json()["value"] == expected_status_or_value
187+
assert response.status_code == 200
188+
assert response.json()["value"] == expected_value
214189

215190

216191
class TestGetXComEntries(TestXComEndpoint):

0 commit comments

Comments
 (0)