Skip to content

Displaying non native XComs in a human readable way on UI #51535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from airflow.exceptions import TaskNotFound
from airflow.models import DAG, DagRun as DR
from airflow.models.xcom import XComModel
from airflow.settings import conf

xcom_router = AirflowRouter(
tags=["XCom"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries"
Expand Down Expand Up @@ -69,41 +68,41 @@ def get_xcom_entry(
stringify: Annotated[bool, Query()] = False,
) -> XComResponseNative | XComResponseString:
"""Get an XCom entry."""
if deserialize:
if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False):
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "XCom deserialization is disabled in configuration."
)
query = select(XComModel, XComModel.value)
else:
query = select(XComModel)

query = query.where(
XComModel.dag_id == dag_id,
XComModel.task_id == task_id,
XComModel.key == xcom_key,
XComModel.map_index == map_index,
xcom_query = XComModel.get_many(
run_id=dag_run_id,
key=xcom_key,
task_ids=task_id,
dag_ids=dag_id,
map_indexes=map_index,
session=session,
limit=1,
)
query = query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id))
query = query.where(DR.run_id == dag_run_id)
query = query.options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))

if deserialize:
item = session.execute(query).one_or_none()
else:
item = session.scalars(query).one_or_none()
# We use `BaseXCom.get_many` to fetch XComs directly from the database, bypassing the XCom Backend.
# This avoids deserialization via the backend (e.g., from a remote storage like S3) and instead
# retrieves the raw serialized value from the database.
result = xcom_query.limit(1).first()

if item is None:
if result is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"XCom entry with key: `{xcom_key}` not found")

item = copy.copy(result)

if deserialize:
from airflow.sdk.execution_time.xcom import XCom
# We use `airflow.serialization.serde` for deserialization here because custom XCom backends (with their own
# serializers/deserializers) are only used on the worker side during task execution.

# However, the XCom value is *always* stored in the metadata database as a valid JSON object.
# Therefore, for purposes such as UI display or returning API responses, deserializing with
# `airflow.serialization.serde` is safe and recommended.
from airflow.serialization.serde import deserialize as serde_deserialize

xcom, value = item
xcom_stub = copy.copy(xcom)
xcom_stub.value = value
xcom_stub.value = XCom.deserialize_value(xcom_stub)
item = xcom_stub
# full=False ensures that the `item` is deserialized without loading the classes, and it returns a stringified version
item.value = serde_deserialize(XComModel.deserialize_value(item), full=False)
else:
# For native format, return the raw serialized value from the database
# This preserves the JSON string format that the API expects
item.value = result.value

if stringify:
return XComResponseString.model_validate(item)
Expand Down
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/ui/src/pages/XCom/XComEntry.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ export const XComEntry = ({ dagId, mapIndex, runId, taskId, xcomKey }: XComEntry
const { data, isLoading } = useXcomServiceGetXcomEntry<XComResponseNative>({
dagId,
dagRunId: runId,
deserialize: true,
mapIndex,
stringify: false,
taskId,
xcomKey,
});
const valueFormatted = JSON.stringify(data?.value, undefined, 4);
// When deserialize=true, the API returns a stringified representation
// so we don't need to JSON.stringify it again
const valueFormatted =
typeof data?.value === "string" ? data.value : JSON.stringify(data?.value, undefined, 4);

return isLoading ? (
<Skeleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,63 +157,38 @@ def test_should_raise_404_for_non_existent_xcom(self, test_client):
assert response.json()["detail"] == f"XCom entry with key: `{TEST_XCOM_KEY_2}` not found"

@pytest.mark.parametrize(
"support_deserialize, params, expected_status_or_value",
"params, expected_value",
[
pytest.param(
True,
{"deserialize": True},
f"real deserialized {TEST_XCOM_VALUE_AS_JSON}",
id="enabled deserialize-true",
TEST_XCOM_VALUE,
id="deserialize=true",
),
pytest.param(
False,
{"deserialize": True},
400,
id="disabled deserialize-true",
),
pytest.param(
True,
{"deserialize": False},
f"{TEST_XCOM_VALUE_AS_JSON}",
id="enabled deserialize-false",
),
pytest.param(
False,
{"deserialize": False},
f"{TEST_XCOM_VALUE_AS_JSON}",
id="disabled deserialize-false",
id="deserialize=false",
),
pytest.param(
True,
{},
f"{TEST_XCOM_VALUE_AS_JSON}",
id="enabled default",
),
pytest.param(
False,
{},
f"{TEST_XCOM_VALUE_AS_JSON}",
id="disabled default",
id="default",
),
],
)
@conf_vars({("core", "xcom_backend"): "unit.api_fastapi.core_api.routes.public.test_xcom.CustomXCom"})
def test_custom_xcom_deserialize(
self, support_deserialize: bool, params: str, expected_status_or_value: int | str, test_client
):
def test_custom_xcom_deserialize(self, params: str, expected_value: int | str, test_client):
# Even with a CustomXCom defined, we should not be using it during deserialization because API / UI doesn't integrate their
# deserialization with custom backends
XCom = resolve_xcom_backend()
self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom)

url = f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}"
with mock.patch("airflow.sdk.execution_time.xcom.XCom", XCom):
with conf_vars({("api", "enable_xcom_deserialize_support"): str(support_deserialize)}):
response = test_client.get(url, params=params)
response = test_client.get(url, params=params)

if isinstance(expected_status_or_value, int):
assert response.status_code == expected_status_or_value
else:
assert response.status_code == 200
assert response.json()["value"] == expected_status_or_value
assert response.status_code == 200
assert response.json()["value"] == expected_value


class TestGetXComEntries(TestXComEndpoint):
Expand Down