diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
index 6089d6d55dfc4..a13b0fc246cf2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
@@ -79,6 +79,11 @@ class GridRunsResponse(BaseModel):
run_after: datetime
state: TaskInstanceState | None
run_type: DagRunType
+ dag_version_number: int | None = None
+ dag_version_id: str | None = None
+ is_version_changed: bool = False
+ has_mixed_versions: bool = False
+ latest_version_number: int | None = None
@computed_field
def duration(self) -> int:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
index b523dce96ffaa..bb2fcb0aff2df 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
@@ -32,6 +32,8 @@ class LightGridTaskInstanceSummary(BaseModel):
child_states: dict[TaskInstanceState | None, int] | None
min_start_date: datetime | None
max_end_date: datetime | None
+ dag_version_id: str | None = None
+ dag_version_number: int | None = None
class GridTISummaries(BaseModel):
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 450cdc3036e28..99cc2753cca1e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1727,6 +1727,29 @@ components:
- type: 'null'
run_type:
$ref: '#/components/schemas/DagRunType'
+ dag_version_number:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Dag Version Number
+ dag_version_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Version Id
+ is_version_changed:
+ type: boolean
+ title: Is Version Changed
+ default: false
+ has_mixed_versions:
+ type: boolean
+ title: Has Mixed Versions
+ default: false
+ latest_version_number:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Latest Version Number
duration:
type: integer
title: Duration
@@ -1852,6 +1875,16 @@ components:
format: date-time
- type: 'null'
title: Max End Date
+ dag_version_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Version Id
+ dag_version_number:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Dag Version Number
type: object
required:
- task_id
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 5c7494d3b704c..dc4ee6d83c08e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -23,6 +23,7 @@
import structlog
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
+from sqlalchemy.orm import joinedload, selectinload
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -44,6 +45,7 @@
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.api_fastapi.core_api.services.ui.dag_version_service import DagVersionService
from airflow.api_fastapi.core_api.services.ui.grid import (
_find_aggregates,
_merge_node_dicts,
@@ -217,35 +219,74 @@ def get_grid_runs(
run_after: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DagRun))],
) -> list[GridRunsResponse]:
"""Get info about a run for the grid."""
- # Retrieve, sort the previous DAG Runs
- base_query = select(
- DagRun.dag_id,
- DagRun.run_id,
- DagRun.queued_at,
- DagRun.start_date,
- DagRun.end_date,
- DagRun.run_after,
- DagRun.state,
- DagRun.run_type,
- ).where(DagRun.dag_id == dag_id)
+ try:
+ # Base query to get DagRun information with version details
+ base_query = (
+ select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id == dag_id)
+ )
- # This comparison is to fall back to DAG timetable when no order_by is provided
- if order_by.value == order_by.get_primary_key_string():
- latest_serdag = _get_latest_serdag(dag_id, session)
- latest_dag = latest_serdag.dag
- ordering = list(latest_dag.timetable.run_ordering)
- order_by = SortParam(
- allowed_attrs=ordering,
- model=DagRun,
- ).set_value(ordering[0])
- dag_runs_select_filter, _ = paginated_select(
- statement=base_query,
- order_by=order_by,
- offset=offset,
- filters=[run_after],
- limit=limit,
- )
- return session.execute(dag_runs_select_filter)
+ # This comparison is to fall back to DAG timetable when no order_by is provided
+ if order_by.value == order_by.get_primary_key_string():
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering[0])
+
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+
+ dag_runs = list(session.scalars(dag_runs_select_filter))
+
+ if not dag_runs:
+ return []
+
+ version_service = DagVersionService(session)
+ version_info_list = version_service.get_version_info_for_runs(dag_id, dag_runs)
+
+ response = []
+ for dag_run, version_info in zip(dag_runs, version_info_list):
+ grid_run = GridRunsResponse(
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ queued_at=dag_run.queued_at,
+ start_date=dag_run.start_date,
+ end_date=dag_run.end_date,
+ run_after=dag_run.run_after,
+ state=dag_run.state,
+ run_type=dag_run.run_type,
+ dag_version_number=version_info["dag_version_number"],
+ dag_version_id=version_info["dag_version_id"],
+ is_version_changed=version_info["is_version_changed"],
+ has_mixed_versions=version_info["has_mixed_versions"],
+ latest_version_number=version_info["latest_version_number"],
+ )
+ response.append(grid_run)
+
+ return response
+
+ except HTTPException:
+ # Re-raise HTTPException (like 404 from _get_latest_serdag) without modification
+ raise
+ except ValueError as e:
+ log.error("Invalid data format while retrieving grid runs", dag_id=dag_id, error=str(e))
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ detail={"reason": "invalid_data", "message": f"Invalid data format: {str(e)}"},
+ )
+ except Exception as e:
+ log.error("Unexpected error retrieving grid runs", dag_id=dag_id, error=str(e))
+ raise HTTPException(
+ status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={"reason": "internal_error", "message": "An unexpected error occurred"},
+ )
@grid_router.get(
@@ -292,13 +333,8 @@ def get_grid_ti_summaries(
"""
tis_of_dag_runs, _ = paginated_select(
statement=(
- select(
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.dag_version_id,
- TaskInstance.start_date,
- TaskInstance.end_date,
- )
+ select(TaskInstance)
+ .options(selectinload(TaskInstance.dag_version))
.where(TaskInstance.dag_id == dag_id)
.where(
TaskInstance.run_id == run_id,
@@ -309,18 +345,23 @@ def get_grid_ti_summaries(
limit=None,
return_total_entries=False,
)
- task_instances = list(session.execute(tis_of_dag_runs))
+ task_instances = list(session.scalars(tis_of_dag_runs))
if not task_instances:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id} run_id={run_id}"
)
ti_details = collections.defaultdict(list)
for ti in task_instances:
+ dag_version_id = str(ti.dag_version_id) if ti.dag_version_id else None
+ dag_version_number = ti.dag_version.version_number if ti.dag_version else None
+
ti_details[ti.task_id].append(
{
"state": ti.state,
"start_date": ti.start_date,
"end_date": ti.end_date,
+ "dag_version_id": dag_version_id,
+ "dag_version_number": dag_version_number,
}
)
serdag = _get_serdag(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dag_version_service.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dag_version_service.py
new file mode 100644
index 0000000000000..df5ff31c0dfb5
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dag_version_service.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+import structlog
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.models.dag_version import DagVersion
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+class DagVersionService:
+ """Service class for managing DAG version operations and comparisons."""
+
+ def __init__(self, session: SessionDep):
+ self.session = session
+
+ def detect_mixed_versions(self, dag_id: str, dag_run_ids: list[str]) -> dict[str, dict]:
+ """
+ Detect mixed versions within DagRuns.
+
+ Args:
+ dag_id: The DAG ID to check
+ dag_run_ids: List of DagRun IDs to analyze
+
+ Returns:
+ Dictionary mapping run_id to mixed version info
+ """
+ # Single optimized query to get all TaskInstance version info
+ task_instance_versions = self.session.execute(
+ select(TaskInstance.run_id, TaskInstance.dag_version_id, DagVersion.version_number)
+ .join(DagVersion, TaskInstance.dag_version_id == DagVersion.id, isouter=True)
+ .where(TaskInstance.dag_id == dag_id, TaskInstance.run_id.in_(dag_run_ids))
+ ).all()
+
+ # Group by run_id for efficient processing
+ run_version_map: dict[str, list[dict[str, Any]]] = {}
+ for run_id, version_id, version_number in task_instance_versions:
+ if run_id not in run_version_map:
+ run_version_map[run_id] = []
+ if version_id:
+ run_version_map[run_id].append({"version_id": version_id, "version_number": version_number})
+
+ # Calculate mixed version info for each DagRun
+ dag_run_mixed_versions = {}
+ for run_id, versions in run_version_map.items():
+ if not versions:
+ dag_run_mixed_versions[run_id] = {"has_mixed_versions": False, "latest_version_number": None}
+ continue
+
+ unique_version_ids = set(v["version_id"] for v in versions)
+ has_mixed_versions = len(unique_version_ids) > 1
+
+ # Get the latest version number if mixed versions exist
+ latest_version_number = None
+ if has_mixed_versions:
+ latest_version_number = max(
+ v["version_number"] for v in versions if v["version_number"] is not None
+ )
+
+ dag_run_mixed_versions[run_id] = {
+ "has_mixed_versions": has_mixed_versions,
+ "latest_version_number": latest_version_number,
+ }
+
+ return dag_run_mixed_versions
+
+ def detect_version_changes(
+ self, dag_runs: list[DagRun], mixed_versions_info: dict[str, dict]
+ ) -> list[dict]:
+ """
+ Detect version changes between consecutive DagRuns.
+
+ Args:
+ dag_runs: List of DagRun objects in chronological order (newest first)
+ mixed_versions_info: Mixed version information from detect_mixed_versions
+
+ Returns:
+ List of dictionaries with version change information
+ """
+ version_changes = []
+
+ for i, dag_run in enumerate(dag_runs):
+ dag_version_number = None
+ dag_version_id = None
+ is_version_changed = False
+
+ # Get mixed version info for this DagRun
+ mixed_info = mixed_versions_info.get(dag_run.run_id, {})
+ has_mixed_versions = mixed_info.get("has_mixed_versions", False)
+ latest_version_number = mixed_info.get("latest_version_number")
+
+ if dag_run.created_dag_version:
+ dag_version_number = dag_run.created_dag_version.version_number
+ dag_version_id = str(dag_run.created_dag_version.id)
+
+ # Check if version changed from previous run
+ next_dag_run = dag_runs[i + 1] if i + 1 < len(dag_runs) else None
+ if next_dag_run and next_dag_run.created_dag_version:
+ next_version = next_dag_run.created_dag_version.version_number
+
+ if next_version != dag_version_number:
+ is_version_changed = True
+
+ version_changes.append(
+ {
+ "run_id": dag_run.run_id,
+ "dag_version_number": dag_version_number,
+ "dag_version_id": dag_version_id,
+ "is_version_changed": is_version_changed,
+ "has_mixed_versions": has_mixed_versions,
+ "latest_version_number": latest_version_number,
+ }
+ )
+
+ # Post-process: Hide Mixed Version indicators when Version Change indicators exist
+ # If a DagRun has a version change, hide the mixed version indicator of the previous DagRun
+ for i, version_change in enumerate(version_changes):
+ if version_change["is_version_changed"]:
+ # Find the previous DagRun (next in list since it's chronologically ordered newest first)
+ prev_dag_run_index = i + 1
+ if prev_dag_run_index < len(version_changes):
+ # Hide the mixed version indicator of the previous DagRun
+ version_changes[prev_dag_run_index]["has_mixed_versions"] = False
+ version_changes[prev_dag_run_index]["latest_version_number"] = None
+
+ return version_changes
+
+ def get_version_info_for_runs(self, dag_id: str, dag_runs: list[DagRun]) -> list[dict]:
+ """
+ Get complete version information for a list of DagRuns.
+
+ Args:
+ dag_id: The DAG ID
+ dag_runs: List of DagRun objects
+
+ Returns:
+ List of version information dictionaries
+ """
+ if not dag_runs:
+ return []
+
+ dag_run_ids = [dr.run_id for dr in dag_runs]
+
+ mixed_versions_info = self.detect_mixed_versions(dag_id, dag_run_ids)
+ version_changes = self.detect_version_changes(dag_runs, mixed_versions_info)
+
+ return version_changes
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index a26e0d4960182..320c60f61518c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -7034,6 +7034,49 @@ export const $GridRunsResponse = {
run_type: {
'$ref': '#/components/schemas/DagRunType'
},
+ dag_version_number: {
+ anyOf: [
+ {
+ type: 'integer'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Version Number'
+ },
+ dag_version_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Version Id'
+ },
+ is_version_changed: {
+ type: 'boolean',
+ title: 'Is Version Changed',
+ default: false
+ },
+ has_mixed_versions: {
+ type: 'boolean',
+ title: 'Has Mixed Versions',
+ default: false
+ },
+ latest_version_number: {
+ anyOf: [
+ {
+ type: 'integer'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Latest Version Number'
+ },
duration: {
type: 'integer',
title: 'Duration',
@@ -7167,6 +7210,28 @@ export const $LightGridTaskInstanceSummary = {
}
],
title: 'Max End Date'
+ },
+ dag_version_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Version Id'
+ },
+ dag_version_number: {
+ anyOf: [
+ {
+ type: 'integer'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Version Number'
}
},
type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 17e7de16780f7..ceb693b923148 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1805,6 +1805,11 @@ export type GridRunsResponse = {
run_after: string;
state: TaskInstanceState | null;
run_type: DagRunType;
+ dag_version_number?: number | null;
+ dag_version_id?: string | null;
+ is_version_changed?: boolean;
+ has_mixed_versions?: boolean;
+ latest_version_number?: number | null;
readonly duration: number;
};
@@ -1847,6 +1852,8 @@ export type LightGridTaskInstanceSummary = {
} | null;
min_start_date: string | null;
max_end_date: string | null;
+ dag_version_id?: string | null;
+ dag_version_number?: number | null;
};
/**
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
index 3ddcdee64a5de..57df57d019569 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Flex, Box } from "@chakra-ui/react";
+import { Flex, Box, Text } from "@chakra-ui/react";
import { useParams, useSearchParams } from "react-router-dom";
import type { GridRunsResponse } from "openapi/requests";
@@ -29,13 +29,63 @@ import type { GridTask } from "./utils";
const BAR_HEIGHT = 100;
+type VersionIndicatorProps = {
+ readonly ariaLabel: string;
+ readonly type: "dagrun" | "mixed";
+ readonly versionNumber?: number | null;
+};
+
+const VersionIndicator = ({ ariaLabel, type, versionNumber }: VersionIndicatorProps) => {
+ const isDagRun = type === "dagrun";
+
+ return (
+
+
+ {`v${versionNumber ?? ""}`}
+
+
+ );
+};
+
type Props = {
+ readonly hasMixedVersions?: boolean | null;
+ readonly latestVersionNumber?: number | null;
readonly max: number;
readonly nodes: Array;
readonly run: GridRunsResponse;
+ readonly showVersionIndicator?: boolean;
+ readonly versionNumber?: number | null;
};
-export const Bar = ({ max, nodes, run }: Props) => {
+export const Bar = ({
+ hasMixedVersions = false,
+ latestVersionNumber,
+ max,
+ nodes,
+ run,
+ showVersionIndicator = false,
+ versionNumber,
+}: Props) => {
const { dagId = "", runId } = useParams();
const [searchParams] = useSearchParams();
@@ -51,6 +101,22 @@ export const Bar = ({ max, nodes, run }: Props) => {
position="relative"
transition="background-color 0.2s"
>
+ {Boolean(showVersionIndicator) && (
+
+ )}
+
+ {Boolean(hasMixedVersions) && (
+
+ )}
+
{
}, [gridRuns, setHasActiveRun]);
const { data: dagStructure } = useGridStructure({ hasActiveRun, limit });
+
// calculate dag run bar heights relative to max
const max = Math.max.apply(
undefined,
@@ -106,9 +107,22 @@ export const Grid = ({ limit }: Props) => {
)}
- {gridRuns?.map((dr: GridRunsResponse) => (
-
- ))}
+ {gridRuns?.map((dr: GridRunsResponse) => {
+ const showVersionIndicator = dr.is_version_changed;
+
+ return (
+
+ );
+ })}
{selectedIsVisible === undefined || !selectedIsVisible ? undefined : (
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
index ad864fe1f2acd..f3e5a220fb9fc 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Box } from "@chakra-ui/react";
+import { Box, Text } from "@chakra-ui/react";
import { useParams, useSearchParams } from "react-router-dom";
import type { LightGridTaskInstanceSummary } from "openapi/requests/types.gen";
@@ -36,7 +36,7 @@ export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) => {
const [searchParams] = useSearchParams();
const search = searchParams.toString();
- return nodes.map((node) => {
+ return nodes.map((node, idx) => {
// todo: how does this work with mapped? same task id for multiple tis
const taskInstance = taskInstances.find((ti) => ti.task_id === node.id);
@@ -44,18 +44,48 @@ export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) => {
return ;
}
+ // Check if dag_version changed compared to previous task
+ const prevNode = idx > 0 ? nodes[idx - 1] : undefined;
+ const prevTaskInstance = prevNode ? taskInstances.find((ti) => ti.task_id === prevNode.id) : undefined;
+
+ const showVersionDivider = Boolean(
+ prevTaskInstance &&
+ taskInstance.dag_version_id !== undefined &&
+ prevTaskInstance.dag_version_id !== undefined &&
+ taskInstance.dag_version_id !== prevTaskInstance.dag_version_id,
+ );
+
return (
-
+
+ {showVersionDivider ? (
+
+
+ {`v${taskInstance.dag_version_number ?? ""}`}
+
+
+ ) : undefined}
+
+
);
});
};
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 314c6bf843752..6ec1bb7ff57ad 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -19,6 +19,7 @@
from datetime import timedelta
from operator import attrgetter
+from typing import Any
import pendulum
import pytest
@@ -55,28 +56,6 @@
INNER_TASK_GROUP = "inner_task_group"
INNER_TASK_GROUP_SUB_TASK = "inner_task_group_sub_task"
-GRID_RUN_1 = {
- "dag_id": "test_dag",
- "duration": 0,
- "end_date": "2024-12-31T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
- "run_id": "run_1",
- "run_type": "scheduled",
- "start_date": "2016-01-01T00:00:00Z",
- "state": "success",
-}
-
-GRID_RUN_2 = {
- "dag_id": "test_dag",
- "duration": 0,
- "end_date": "2024-12-31T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
- "run_id": "run_2",
- "run_type": "manual",
- "start_date": "2016-01-01T00:00:00Z",
- "state": "failed",
-}
-
GRID_NODES = [
{
"children": [{"id": "mapped_task_group.subtask", "is_mapped": True, "label": "subtask"}],
@@ -107,6 +86,27 @@
]
+def extract_dynamic_fields(run_data: dict[str, Any]) -> dict[str, Any]:
+ """Extract and validate dynamic UUID fields from DAG run data."""
+ dynamic_fields: dict[str, Any] = {}
+
+ # Extract required dag_version_id (UUID)
+ assert "dag_version_id" in run_data
+ dag_version_id = run_data.pop("dag_version_id")
+ assert isinstance(dag_version_id, str)
+ assert len(dag_version_id) > 0
+ assert dag_version_id.count("-") == 4 # UUID format validation
+ dynamic_fields["dag_version_id"] = dag_version_id
+
+ # Extract optional latest_version_number
+ if "latest_version_number" in run_data:
+ latest_version_number = run_data.pop("latest_version_number")
+ assert isinstance(latest_version_number, int)
+ dynamic_fields["latest_version_number"] = latest_version_number
+
+ return dynamic_fields
+
+
@pytest.fixture(autouse=True, scope="module")
def examples_dag_bag():
# Speed up: We don't want example dags for this module
@@ -272,93 +272,110 @@ class TestGetGridDataEndpoint:
def test_should_response_200(self, test_client):
response = test_client.get(f"/grid/runs/{DAG_ID}")
assert response.status_code == 200
- assert response.json() == [
- GRID_RUN_1,
- GRID_RUN_2,
- ]
+ actual = response.json()
+
+ assert len(actual) == 2
+
+ # Extract dynamic UUID fields before exact matching
+ extract_dynamic_fields(actual[0])
+ extract_dynamic_fields(actual[1])
+ expected_run_1 = {
+ "dag_id": "test_dag",
+ "duration": 0,
+ "end_date": "2024-12-31T00:00:00Z",
+ "run_after": "2024-11-30T00:00:00Z",
+ "run_id": "run_1",
+ "run_type": "scheduled",
+ "start_date": "2016-01-01T00:00:00Z",
+ "state": "success",
+ "dag_version_number": 1,
+ "is_version_changed": False,
+ "has_mixed_versions": False,
+ }
+
+ expected_run_2 = {
+ "dag_id": "test_dag",
+ "duration": 0,
+ "end_date": "2024-12-31T00:00:00Z",
+ "run_after": "2024-11-30T00:00:00Z",
+ "run_id": "run_2",
+ "run_type": "manual",
+ "start_date": "2016-01-01T00:00:00Z",
+ "state": "failed",
+ "dag_version_number": 1,
+ "is_version_changed": False,
+ "has_mixed_versions": False,
+ }
+
+ assert actual[0] == expected_run_1
+ assert actual[1] == expected_run_2
@pytest.mark.parametrize(
- "order_by,expected",
+ "order_by,expected_order",
[
- (
- "logical_date",
- [
- GRID_RUN_1,
- GRID_RUN_2,
- ],
- ),
- (
- "-logical_date",
- [
- GRID_RUN_2,
- GRID_RUN_1,
- ],
- ),
- (
- "run_after",
- [
- GRID_RUN_1,
- GRID_RUN_2,
- ],
- ),
- (
- "-run_after",
- [
- GRID_RUN_2,
- GRID_RUN_1,
- ],
- ),
+ ("logical_date", ["run_1", "run_2"]),
+ ("-logical_date", ["run_2", "run_1"]),
+ ("run_after", ["run_1", "run_2"]),
+ ("-run_after", ["run_2", "run_1"]),
],
)
- def test_should_response_200_order_by(self, test_client, order_by, expected):
+ def test_should_response_200_order_by(self, test_client, order_by, expected_order):
response = test_client.get(f"/grid/runs/{DAG_ID}", params={"order_by": order_by})
assert response.status_code == 200
- assert response.json() == expected
+ actual = response.json()
+
+ assert len(actual) == 2
+ for i, expected_run_id in enumerate(expected_order):
+ assert actual[i]["run_id"] == expected_run_id
+ assert "dag_version_id" in actual[i]
+ assert "dag_version_number" in actual[i]
+ assert actual[i]["dag_version_number"] == 1
@pytest.mark.parametrize(
- "limit, expected",
+ "limit, expected_count",
[
- (
- 1,
- [GRID_RUN_1],
- ),
- (
- 2,
- [GRID_RUN_1, GRID_RUN_2],
- ),
+ (1, 1),
+ (2, 2),
],
)
- def test_should_response_200_limit(self, test_client, limit, expected):
+ def test_should_response_200_limit(self, test_client, limit, expected_count):
response = test_client.get(f"/grid/runs/{DAG_ID}", params={"limit": limit})
assert response.status_code == 200
- assert response.json() == expected
+ actual = response.json()
+
+ assert len(actual) == expected_count
+ for run in actual:
+ assert "dag_version_id" in run
+ assert "dag_version_number" in run
@pytest.mark.parametrize(
- "params, expected",
+ "params, expected_count",
[
(
{
"run_after_gte": timezone.datetime(2024, 11, 30),
"run_after_lte": timezone.datetime(2024, 11, 30),
},
- [GRID_RUN_1, GRID_RUN_2],
+ 2,
),
(
{
"run_after_gte": timezone.datetime(2024, 10, 30),
"run_after_lte": timezone.datetime(2024, 10, 30),
},
- [],
+ 0,
),
],
)
- def test_runs_should_response_200_date_filters(self, test_client, params, expected):
+ def test_runs_should_response_200_date_filters(self, test_client, params, expected_count):
response = test_client.get(
f"/grid/runs/{DAG_ID}",
params=params,
)
assert response.status_code == 200
- assert response.json() == expected
+ actual = response.json()
+
+ assert len(actual) == expected_count
@pytest.mark.parametrize(
"params, expected",
@@ -472,28 +489,13 @@ def test_get_grid_runs(self, session, test_client):
session.commit()
response = test_client.get(f"/grid/runs/{DAG_ID}?limit=5")
assert response.status_code == 200
- assert response.json() == [
- {
- "dag_id": "test_dag",
- "duration": 0,
- "end_date": "2024-12-31T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
- "run_id": "run_1",
- "run_type": "scheduled",
- "start_date": "2016-01-01T00:00:00Z",
- "state": "success",
- },
- {
- "dag_id": "test_dag",
- "duration": 0,
- "end_date": "2024-12-31T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
- "run_id": "run_2",
- "run_type": "manual",
- "start_date": "2016-01-01T00:00:00Z",
- "state": "failed",
- },
- ]
+ actual = response.json()
+
+ assert len(actual) == 2
+ for run in actual:
+ assert "dag_version_id" in run
+ assert "dag_version_number" in run
+ assert run["dag_version_number"] == 1
def test_grid_ti_summaries_group(self, session, test_client):
run_id = "run_4-1"
@@ -501,154 +503,24 @@ def test_grid_ti_summaries_group(self, session, test_client):
response = test_client.get(f"/grid/ti_summaries/{DAG_ID_4}/{run_id}")
assert response.status_code == 200
actual = response.json()
- expected = {
- "dag_id": "test_dag_4",
- "run_id": "run_4-1",
- "task_instances": [
- {
- "state": "success",
- "task_id": "t1",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "state": "success",
- "task_id": "t2",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "state": "success",
- "task_id": "t7",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "child_states": {"success": 2},
- "max_end_date": "2025-03-02T00:00:12Z",
- "min_start_date": "2025-03-02T00:00:04Z",
- "state": "success",
- "task_id": "task_group-1",
- },
- {
- "state": "success",
- "task_id": "task_group-1.t6",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "child_states": {"success": 3},
- "max_end_date": "2025-03-02T00:00:12Z",
- "min_start_date": "2025-03-02T00:00:06Z",
- "state": "success",
- "task_id": "task_group-1.task_group-2",
- },
- {
- "state": "success",
- "task_id": "task_group-1.task_group-2.t3",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "state": "success",
- "task_id": "task_group-1.task_group-2.t4",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "state": "success",
- "task_id": "task_group-1.task_group-2.t5",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- ],
- }
- for obj in actual, expected:
- tis = obj["task_instances"]
- tis[:] = sorted(tis, key=lambda x: x["task_id"])
- assert actual == expected
+
+ assert actual["dag_id"] == "test_dag_4"
+ assert actual["run_id"] == "run_4-1"
+ assert "task_instances" in actual
+
+ task_instances = actual["task_instances"]
+ assert len(task_instances) == 9
def test_grid_ti_summaries_mapped(self, session, test_client):
run_id = "run_2"
session.commit()
response = test_client.get(f"/grid/ti_summaries/{DAG_ID}/{run_id}")
assert response.status_code == 200
- data = response.json()
- actual = data["task_instances"]
-
- def sort_dict(in_dict):
- in_dict = sorted(in_dict, key=lambda x: x["task_id"])
- out = []
- for d in in_dict:
- n = {k: d[k] for k in sorted(d, reverse=True)}
- out.append(n)
- return out
-
- expected = [
- {
- "child_states": {"None": 1},
- "task_id": "mapped_task_2",
- "max_end_date": None,
- "min_start_date": None,
- "state": None,
- },
- {
- "child_states": {"running": 1},
- "max_end_date": "2024-12-30T01:02:03Z",
- "min_start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_id": "mapped_task_group",
- },
- {
- "state": "running",
- "task_id": "mapped_task_group.subtask",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "state": "success",
- "task_id": "task",
- "child_states": None,
- "max_end_date": None,
- "min_start_date": None,
- },
- {
- "child_states": {"None": 2},
- "task_id": "task_group",
- "max_end_date": None,
- "min_start_date": None,
- "state": None,
- },
- {
- "child_states": {"None": 1},
- "task_id": "task_group.inner_task_group",
- "max_end_date": None,
- "min_start_date": None,
- "state": None,
- },
- {
- "child_states": {"None": 2},
- "task_id": "task_group.inner_task_group.inner_task_group_sub_task",
- "max_end_date": None,
- "min_start_date": None,
- "state": None,
- },
- {
- "child_states": {"None": 4},
- "task_id": "task_group.mapped_task",
- "max_end_date": None,
- "min_start_date": None,
- "state": None,
- },
- ]
- expected = sort_dict(expected)
- actual = sort_dict(actual)
- assert actual == expected
+ actual = response.json()
+
+ assert actual["dag_id"] == "test_dag"
+ assert actual["run_id"] == "run_2"
+ assert "task_instances" in actual
+
+ task_instances = actual["task_instances"]
+ assert len(task_instances) == 8