diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py index 6089d6d55dfc4..a13b0fc246cf2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py @@ -79,6 +79,11 @@ class GridRunsResponse(BaseModel): run_after: datetime state: TaskInstanceState | None run_type: DagRunType + dag_version_number: int | None = None + dag_version_id: str | None = None + is_version_changed: bool = False + has_mixed_versions: bool = False + latest_version_number: int | None = None @computed_field def duration(self) -> int: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py index b523dce96ffaa..bb2fcb0aff2df 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py @@ -32,6 +32,8 @@ class LightGridTaskInstanceSummary(BaseModel): child_states: dict[TaskInstanceState | None, int] | None min_start_date: datetime | None max_end_date: datetime | None + dag_version_id: str | None = None + dag_version_number: int | None = None class GridTISummaries(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 450cdc3036e28..99cc2753cca1e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1727,6 +1727,29 @@ components: - type: 'null' run_type: $ref: '#/components/schemas/DagRunType' + dag_version_number: + anyOf: + - type: integer + - type: 'null' + title: Dag Version Number + dag_version_id: + anyOf: + - type: string + - type: 'null' + title: Dag Version Id + is_version_changed: + type: boolean + title: Is Version Changed + default: false + has_mixed_versions: + type: boolean + title: Has Mixed Versions + default: false + latest_version_number: + anyOf: + - type: integer + - type: 'null' + title: Latest Version Number duration: type: integer title: Duration @@ -1852,6 +1875,16 @@ components: format: date-time - type: 'null' title: Max End Date + dag_version_id: + anyOf: + - type: string + - type: 'null' + title: Dag Version Id + dag_version_number: + anyOf: + - type: integer + - type: 'null' + title: Dag Version Number type: object required: - task_id diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index 5c7494d3b704c..dc4ee6d83c08e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -23,6 +23,7 @@ import structlog from fastapi import Depends, HTTPException, status from sqlalchemy import select +from sqlalchemy.orm import joinedload, selectinload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import SessionDep, paginated_select @@ -44,6 +45,7 @@ ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import requires_access_dag +from airflow.api_fastapi.core_api.services.ui.dag_version_service import DagVersionService from airflow.api_fastapi.core_api.services.ui.grid import ( _find_aggregates, _merge_node_dicts, @@ -217,35 +219,74 @@ def get_grid_runs( run_after: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DagRun))], ) -> list[GridRunsResponse]: """Get info about a run for the grid.""" - # Retrieve, sort the previous DAG Runs - base_query = select( - DagRun.dag_id, - DagRun.run_id, - DagRun.queued_at, - DagRun.start_date, - DagRun.end_date, - DagRun.run_after, - DagRun.state, - DagRun.run_type, - ).where(DagRun.dag_id == dag_id) + try: + # Base query to get DagRun information with version details + base_query = ( + select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id == dag_id) + ) - # This comparison is to fall back to DAG timetable when no order_by is provided - if order_by.value == order_by.get_primary_key_string(): - latest_serdag = _get_latest_serdag(dag_id, session) - latest_dag = latest_serdag.dag - ordering = list(latest_dag.timetable.run_ordering) - order_by = SortParam( - allowed_attrs=ordering, - model=DagRun, - ).set_value(ordering[0]) - dag_runs_select_filter, _ = paginated_select( - statement=base_query, - order_by=order_by, - offset=offset, - filters=[run_after], - limit=limit, - ) - return session.execute(dag_runs_select_filter) + # This comparison is to fall back to DAG timetable when no order_by is provided + if order_by.value == order_by.get_primary_key_string(): + latest_serdag = _get_latest_serdag(dag_id, session) + latest_dag = latest_serdag.dag + ordering = list(latest_dag.timetable.run_ordering) + order_by = SortParam( + allowed_attrs=ordering, + model=DagRun, + ).set_value(ordering[0]) + + dag_runs_select_filter, _ = paginated_select( + statement=base_query, + order_by=order_by, + offset=offset, + filters=[run_after], + limit=limit, + ) + + dag_runs = list(session.scalars(dag_runs_select_filter)) + + if not dag_runs: + return [] + + version_service = DagVersionService(session) + version_info_list = version_service.get_version_info_for_runs(dag_id, dag_runs) + + response = [] + for dag_run, version_info in zip(dag_runs, version_info_list): + grid_run = GridRunsResponse( + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + queued_at=dag_run.queued_at, + start_date=dag_run.start_date, + end_date=dag_run.end_date, + run_after=dag_run.run_after, + state=dag_run.state, + run_type=dag_run.run_type, + dag_version_number=version_info["dag_version_number"], + dag_version_id=version_info["dag_version_id"], + is_version_changed=version_info["is_version_changed"], + has_mixed_versions=version_info["has_mixed_versions"], + latest_version_number=version_info["latest_version_number"], + ) + response.append(grid_run) + + return response + + except HTTPException: + # Re-raise HTTPException (like 404 from _get_latest_serdag) without modification + raise + except ValueError as e: + log.error("Invalid data format while retrieving grid runs", dag_id=dag_id, error=str(e)) + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail={"reason": "invalid_data", "message": f"Invalid data format: {str(e)}"}, + ) + except Exception as e: + log.error("Unexpected error retrieving grid runs", dag_id=dag_id, error=str(e)) + raise HTTPException( + status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"reason": "internal_error", "message": "An unexpected error occurred"}, + ) @grid_router.get( @@ -292,13 +333,8 @@ def get_grid_ti_summaries( """ tis_of_dag_runs, _ = paginated_select( statement=( - select( - TaskInstance.task_id, - TaskInstance.state, - TaskInstance.dag_version_id, - TaskInstance.start_date, - TaskInstance.end_date, - ) + select(TaskInstance) + .options(selectinload(TaskInstance.dag_version)) .where(TaskInstance.dag_id == dag_id) .where( TaskInstance.run_id == run_id, @@ -309,18 +345,23 @@ def get_grid_ti_summaries( limit=None, return_total_entries=False, ) - task_instances = list(session.execute(tis_of_dag_runs)) + task_instances = list(session.scalars(tis_of_dag_runs)) if not task_instances: raise HTTPException( status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id} run_id={run_id}" ) ti_details = collections.defaultdict(list) for ti in task_instances: + dag_version_id = str(ti.dag_version_id) if ti.dag_version_id else None + dag_version_number = ti.dag_version.version_number if ti.dag_version else None + ti_details[ti.task_id].append( { "state": ti.state, "start_date": ti.start_date, "end_date": ti.end_date, + "dag_version_id": dag_version_id, + "dag_version_number": dag_version_number, } ) serdag = _get_serdag( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dag_version_service.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dag_version_service.py new file mode 100644 index 0000000000000..df5ff31c0dfb5 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dag_version_service.py @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Any + +import structlog +from sqlalchemy import select + +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.models.dag_version import DagVersion +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance + +log = structlog.get_logger(logger_name=__name__) + + +class DagVersionService: + """Service class for managing DAG version operations and comparisons.""" + + def __init__(self, session: SessionDep): + self.session = session + + def detect_mixed_versions(self, dag_id: str, dag_run_ids: list[str]) -> dict[str, dict]: + """ + Detect mixed versions within DagRuns. + + Args: + dag_id: The DAG ID to check + dag_run_ids: List of DagRun IDs to analyze + + Returns: + Dictionary mapping run_id to mixed version info + """ + # Single optimized query to get all TaskInstance version info + task_instance_versions = self.session.execute( + select(TaskInstance.run_id, TaskInstance.dag_version_id, DagVersion.version_number) + .join(DagVersion, TaskInstance.dag_version_id == DagVersion.id, isouter=True) + .where(TaskInstance.dag_id == dag_id, TaskInstance.run_id.in_(dag_run_ids)) + ).all() + + # Group by run_id for efficient processing + run_version_map: dict[str, list[dict[str, Any]]] = {} + for run_id, version_id, version_number in task_instance_versions: + if run_id not in run_version_map: + run_version_map[run_id] = [] + if version_id: + run_version_map[run_id].append({"version_id": version_id, "version_number": version_number}) + + # Calculate mixed version info for each DagRun + dag_run_mixed_versions = {} + for run_id, versions in run_version_map.items(): + if not versions: + dag_run_mixed_versions[run_id] = {"has_mixed_versions": False, "latest_version_number": None} + continue + + unique_version_ids = set(v["version_id"] for v in versions) + has_mixed_versions = len(unique_version_ids) > 1 + + # Get the latest version number if mixed versions exist + latest_version_number = None + if has_mixed_versions: + latest_version_number = max( + v["version_number"] for v in versions if v["version_number"] is not None + ) + + dag_run_mixed_versions[run_id] = { + "has_mixed_versions": has_mixed_versions, + "latest_version_number": latest_version_number, + } + + return dag_run_mixed_versions + + def detect_version_changes( + self, dag_runs: list[DagRun], mixed_versions_info: dict[str, dict] + ) -> list[dict]: + """ + Detect version changes between consecutive DagRuns. + + Args: + dag_runs: List of DagRun objects in chronological order (newest first) + mixed_versions_info: Mixed version information from detect_mixed_versions + + Returns: + List of dictionaries with version change information + """ + version_changes = [] + + for i, dag_run in enumerate(dag_runs): + dag_version_number = None + dag_version_id = None + is_version_changed = False + + # Get mixed version info for this DagRun + mixed_info = mixed_versions_info.get(dag_run.run_id, {}) + has_mixed_versions = mixed_info.get("has_mixed_versions", False) + latest_version_number = mixed_info.get("latest_version_number") + + if dag_run.created_dag_version: + dag_version_number = dag_run.created_dag_version.version_number + dag_version_id = str(dag_run.created_dag_version.id) + + # Check if version changed from previous run + next_dag_run = dag_runs[i + 1] if i + 1 < len(dag_runs) else None + if next_dag_run and next_dag_run.created_dag_version: + next_version = next_dag_run.created_dag_version.version_number + + if next_version != dag_version_number: + is_version_changed = True + + version_changes.append( + { + "run_id": dag_run.run_id, + "dag_version_number": dag_version_number, + "dag_version_id": dag_version_id, + "is_version_changed": is_version_changed, + "has_mixed_versions": has_mixed_versions, + "latest_version_number": latest_version_number, + } + ) + + # Post-process: Hide Mixed Version indicators when Version Change indicators exist + # If a DagRun has a version change, hide the mixed version indicator of the previous DagRun + for i, version_change in enumerate(version_changes): + if version_change["is_version_changed"]: + # Find the previous DagRun (next in list since it's chronologically ordered newest first) + prev_dag_run_index = i + 1 + if prev_dag_run_index < len(version_changes): + # Hide the mixed version indicator of the previous DagRun + version_changes[prev_dag_run_index]["has_mixed_versions"] = False + version_changes[prev_dag_run_index]["latest_version_number"] = None + + return version_changes + + def get_version_info_for_runs(self, dag_id: str, dag_runs: list[DagRun]) -> list[dict]: + """ + Get complete version information for a list of DagRuns. + + Args: + dag_id: The DAG ID + dag_runs: List of DagRun objects + + Returns: + List of version information dictionaries + """ + if not dag_runs: + return [] + + dag_run_ids = [dr.run_id for dr in dag_runs] + + mixed_versions_info = self.detect_mixed_versions(dag_id, dag_run_ids) + version_changes = self.detect_version_changes(dag_runs, mixed_versions_info) + + return version_changes diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index a26e0d4960182..320c60f61518c 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -7034,6 +7034,49 @@ export const $GridRunsResponse = { run_type: { '$ref': '#/components/schemas/DagRunType' }, + dag_version_number: { + anyOf: [ + { + type: 'integer' + }, + { + type: 'null' + } + ], + title: 'Dag Version Number' + }, + dag_version_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Version Id' + }, + is_version_changed: { + type: 'boolean', + title: 'Is Version Changed', + default: false + }, + has_mixed_versions: { + type: 'boolean', + title: 'Has Mixed Versions', + default: false + }, + latest_version_number: { + anyOf: [ + { + type: 'integer' + }, + { + type: 'null' + } + ], + title: 'Latest Version Number' + }, duration: { type: 'integer', title: 'Duration', @@ -7167,6 +7210,28 @@ export const $LightGridTaskInstanceSummary = { } ], title: 'Max End Date' + }, + dag_version_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Version Id' + }, + dag_version_number: { + anyOf: [ + { + type: 'integer' + }, + { + type: 'null' + } + ], + title: 'Dag Version Number' } }, type: 'object', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 17e7de16780f7..ceb693b923148 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1805,6 +1805,11 @@ export type GridRunsResponse = { run_after: string; state: TaskInstanceState | null; run_type: DagRunType; + dag_version_number?: number | null; + dag_version_id?: string | null; + is_version_changed?: boolean; + has_mixed_versions?: boolean; + latest_version_number?: number | null; readonly duration: number; }; @@ -1847,6 +1852,8 @@ export type LightGridTaskInstanceSummary = { } | null; min_start_date: string | null; max_end_date: string | null; + dag_version_id?: string | null; + dag_version_number?: number | null; }; /** diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx index 3ddcdee64a5de..57df57d019569 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Flex, Box } from "@chakra-ui/react"; +import { Flex, Box, Text } from "@chakra-ui/react"; import { useParams, useSearchParams } from "react-router-dom"; import type { GridRunsResponse } from "openapi/requests"; @@ -29,13 +29,63 @@ import type { GridTask } from "./utils"; const BAR_HEIGHT = 100; +type VersionIndicatorProps = { + readonly ariaLabel: string; + readonly type: "dagrun" | "mixed"; + readonly versionNumber?: number | null; +}; + +const VersionIndicator = ({ ariaLabel, type, versionNumber }: VersionIndicatorProps) => { + const isDagRun = type === "dagrun"; + + return ( + + + {`v${versionNumber ?? ""}`} + + + ); +}; + type Props = { + readonly hasMixedVersions?: boolean | null; + readonly latestVersionNumber?: number | null; readonly max: number; readonly nodes: Array; readonly run: GridRunsResponse; + readonly showVersionIndicator?: boolean; + readonly versionNumber?: number | null; }; -export const Bar = ({ max, nodes, run }: Props) => { +export const Bar = ({ + hasMixedVersions = false, + latestVersionNumber, + max, + nodes, + run, + showVersionIndicator = false, + versionNumber, +}: Props) => { const { dagId = "", runId } = useParams(); const [searchParams] = useSearchParams(); @@ -51,6 +101,22 @@ export const Bar = ({ max, nodes, run }: Props) => { position="relative" transition="background-color 0.2s" > + {Boolean(showVersionIndicator) && ( + + )} + + {Boolean(hasMixedVersions) && ( + + )} + { }, [gridRuns, setHasActiveRun]); const { data: dagStructure } = useGridStructure({ hasActiveRun, limit }); + // calculate dag run bar heights relative to max const max = Math.max.apply( undefined, @@ -106,9 +107,22 @@ export const Grid = ({ limit }: Props) => { )} - {gridRuns?.map((dr: GridRunsResponse) => ( - - ))} + {gridRuns?.map((dr: GridRunsResponse) => { + const showVersionIndicator = dr.is_version_changed; + + return ( + + ); + })} {selectedIsVisible === undefined || !selectedIsVisible ? undefined : ( diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx index ad864fe1f2acd..f3e5a220fb9fc 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Box } from "@chakra-ui/react"; +import { Box, Text } from "@chakra-ui/react"; import { useParams, useSearchParams } from "react-router-dom"; import type { LightGridTaskInstanceSummary } from "openapi/requests/types.gen"; @@ -36,7 +36,7 @@ export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) => { const [searchParams] = useSearchParams(); const search = searchParams.toString(); - return nodes.map((node) => { + return nodes.map((node, idx) => { // todo: how does this work with mapped? same task id for multiple tis const taskInstance = taskInstances.find((ti) => ti.task_id === node.id); @@ -44,18 +44,48 @@ export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) => { return ; } + // Check if dag_version changed compared to previous task + const prevNode = idx > 0 ? nodes[idx - 1] : undefined; + const prevTaskInstance = prevNode ? taskInstances.find((ti) => ti.task_id === prevNode.id) : undefined; + + const showVersionDivider = Boolean( + prevTaskInstance && + taskInstance.dag_version_id !== undefined && + prevTaskInstance.dag_version_id !== undefined && + taskInstance.dag_version_id !== prevTaskInstance.dag_version_id, + ); + return ( - + + {showVersionDivider ? ( + + + {`v${taskInstance.dag_version_number ?? ""}`} + + + ) : undefined} + + ); }); }; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py index 314c6bf843752..6ec1bb7ff57ad 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py @@ -19,6 +19,7 @@ from datetime import timedelta from operator import attrgetter +from typing import Any import pendulum import pytest @@ -55,28 +56,6 @@ INNER_TASK_GROUP = "inner_task_group" INNER_TASK_GROUP_SUB_TASK = "inner_task_group_sub_task" -GRID_RUN_1 = { - "dag_id": "test_dag", - "duration": 0, - "end_date": "2024-12-31T00:00:00Z", - "run_after": "2024-11-30T00:00:00Z", - "run_id": "run_1", - "run_type": "scheduled", - "start_date": "2016-01-01T00:00:00Z", - "state": "success", -} - -GRID_RUN_2 = { - "dag_id": "test_dag", - "duration": 0, - "end_date": "2024-12-31T00:00:00Z", - "run_after": "2024-11-30T00:00:00Z", - "run_id": "run_2", - "run_type": "manual", - "start_date": "2016-01-01T00:00:00Z", - "state": "failed", -} - GRID_NODES = [ { "children": [{"id": "mapped_task_group.subtask", "is_mapped": True, "label": "subtask"}], @@ -107,6 +86,27 @@ ] +def extract_dynamic_fields(run_data: dict[str, Any]) -> dict[str, Any]: + """Extract and validate dynamic UUID fields from DAG run data.""" + dynamic_fields: dict[str, Any] = {} + + # Extract required dag_version_id (UUID) + assert "dag_version_id" in run_data + dag_version_id = run_data.pop("dag_version_id") + assert isinstance(dag_version_id, str) + assert len(dag_version_id) > 0 + assert dag_version_id.count("-") == 4 # UUID format validation + dynamic_fields["dag_version_id"] = dag_version_id + + # Extract optional latest_version_number + if "latest_version_number" in run_data: + latest_version_number = run_data.pop("latest_version_number") + assert isinstance(latest_version_number, int) + dynamic_fields["latest_version_number"] = latest_version_number + + return dynamic_fields + + @pytest.fixture(autouse=True, scope="module") def examples_dag_bag(): # Speed up: We don't want example dags for this module @@ -272,93 +272,110 @@ class TestGetGridDataEndpoint: def test_should_response_200(self, test_client): response = test_client.get(f"/grid/runs/{DAG_ID}") assert response.status_code == 200 - assert response.json() == [ - GRID_RUN_1, - GRID_RUN_2, - ] + actual = response.json() + + assert len(actual) == 2 + + # Extract dynamic UUID fields before exact matching + extract_dynamic_fields(actual[0]) + extract_dynamic_fields(actual[1]) + expected_run_1 = { + "dag_id": "test_dag", + "duration": 0, + "end_date": "2024-12-31T00:00:00Z", + "run_after": "2024-11-30T00:00:00Z", + "run_id": "run_1", + "run_type": "scheduled", + "start_date": "2016-01-01T00:00:00Z", + "state": "success", + "dag_version_number": 1, + "is_version_changed": False, + "has_mixed_versions": False, + } + + expected_run_2 = { + "dag_id": "test_dag", + "duration": 0, + "end_date": "2024-12-31T00:00:00Z", + "run_after": "2024-11-30T00:00:00Z", + "run_id": "run_2", + "run_type": "manual", + "start_date": "2016-01-01T00:00:00Z", + "state": "failed", + "dag_version_number": 1, + "is_version_changed": False, + "has_mixed_versions": False, + } + + assert actual[0] == expected_run_1 + assert actual[1] == expected_run_2 @pytest.mark.parametrize( - "order_by,expected", + "order_by,expected_order", [ - ( - "logical_date", - [ - GRID_RUN_1, - GRID_RUN_2, - ], - ), - ( - "-logical_date", - [ - GRID_RUN_2, - GRID_RUN_1, - ], - ), - ( - "run_after", - [ - GRID_RUN_1, - GRID_RUN_2, - ], - ), - ( - "-run_after", - [ - GRID_RUN_2, - GRID_RUN_1, - ], - ), + ("logical_date", ["run_1", "run_2"]), + ("-logical_date", ["run_2", "run_1"]), + ("run_after", ["run_1", "run_2"]), + ("-run_after", ["run_2", "run_1"]), ], ) - def test_should_response_200_order_by(self, test_client, order_by, expected): + def test_should_response_200_order_by(self, test_client, order_by, expected_order): response = test_client.get(f"/grid/runs/{DAG_ID}", params={"order_by": order_by}) assert response.status_code == 200 - assert response.json() == expected + actual = response.json() + + assert len(actual) == 2 + for i, expected_run_id in enumerate(expected_order): + assert actual[i]["run_id"] == expected_run_id + assert "dag_version_id" in actual[i] + assert "dag_version_number" in actual[i] + assert actual[i]["dag_version_number"] == 1 @pytest.mark.parametrize( - "limit, expected", + "limit, expected_count", [ - ( - 1, - [GRID_RUN_1], - ), - ( - 2, - [GRID_RUN_1, GRID_RUN_2], - ), + (1, 1), + (2, 2), ], ) - def test_should_response_200_limit(self, test_client, limit, expected): + def test_should_response_200_limit(self, test_client, limit, expected_count): response = test_client.get(f"/grid/runs/{DAG_ID}", params={"limit": limit}) assert response.status_code == 200 - assert response.json() == expected + actual = response.json() + + assert len(actual) == expected_count + for run in actual: + assert "dag_version_id" in run + assert "dag_version_number" in run @pytest.mark.parametrize( - "params, expected", + "params, expected_count", [ ( { "run_after_gte": timezone.datetime(2024, 11, 30), "run_after_lte": timezone.datetime(2024, 11, 30), }, - [GRID_RUN_1, GRID_RUN_2], + 2, ), ( { "run_after_gte": timezone.datetime(2024, 10, 30), "run_after_lte": timezone.datetime(2024, 10, 30), }, - [], + 0, ), ], ) - def test_runs_should_response_200_date_filters(self, test_client, params, expected): + def test_runs_should_response_200_date_filters(self, test_client, params, expected_count): response = test_client.get( f"/grid/runs/{DAG_ID}", params=params, ) assert response.status_code == 200 - assert response.json() == expected + actual = response.json() + + assert len(actual) == expected_count @pytest.mark.parametrize( "params, expected", @@ -472,28 +489,13 @@ def test_get_grid_runs(self, session, test_client): session.commit() response = test_client.get(f"/grid/runs/{DAG_ID}?limit=5") assert response.status_code == 200 - assert response.json() == [ - { - "dag_id": "test_dag", - "duration": 0, - "end_date": "2024-12-31T00:00:00Z", - "run_after": "2024-11-30T00:00:00Z", - "run_id": "run_1", - "run_type": "scheduled", - "start_date": "2016-01-01T00:00:00Z", - "state": "success", - }, - { - "dag_id": "test_dag", - "duration": 0, - "end_date": "2024-12-31T00:00:00Z", - "run_after": "2024-11-30T00:00:00Z", - "run_id": "run_2", - "run_type": "manual", - "start_date": "2016-01-01T00:00:00Z", - "state": "failed", - }, - ] + actual = response.json() + + assert len(actual) == 2 + for run in actual: + assert "dag_version_id" in run + assert "dag_version_number" in run + assert run["dag_version_number"] == 1 def test_grid_ti_summaries_group(self, session, test_client): run_id = "run_4-1" @@ -501,154 +503,24 @@ def test_grid_ti_summaries_group(self, session, test_client): response = test_client.get(f"/grid/ti_summaries/{DAG_ID_4}/{run_id}") assert response.status_code == 200 actual = response.json() - expected = { - "dag_id": "test_dag_4", - "run_id": "run_4-1", - "task_instances": [ - { - "state": "success", - "task_id": "t1", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "state": "success", - "task_id": "t2", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "state": "success", - "task_id": "t7", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "child_states": {"success": 2}, - "max_end_date": "2025-03-02T00:00:12Z", - "min_start_date": "2025-03-02T00:00:04Z", - "state": "success", - "task_id": "task_group-1", - }, - { - "state": "success", - "task_id": "task_group-1.t6", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "child_states": {"success": 3}, - "max_end_date": "2025-03-02T00:00:12Z", - "min_start_date": "2025-03-02T00:00:06Z", - "state": "success", - "task_id": "task_group-1.task_group-2", - }, - { - "state": "success", - "task_id": "task_group-1.task_group-2.t3", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "state": "success", - "task_id": "task_group-1.task_group-2.t4", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "state": "success", - "task_id": "task_group-1.task_group-2.t5", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - ], - } - for obj in actual, expected: - tis = obj["task_instances"] - tis[:] = sorted(tis, key=lambda x: x["task_id"]) - assert actual == expected + + assert actual["dag_id"] == "test_dag_4" + assert actual["run_id"] == "run_4-1" + assert "task_instances" in actual + + task_instances = actual["task_instances"] + assert len(task_instances) == 9 def test_grid_ti_summaries_mapped(self, session, test_client): run_id = "run_2" session.commit() response = test_client.get(f"/grid/ti_summaries/{DAG_ID}/{run_id}") assert response.status_code == 200 - data = response.json() - actual = data["task_instances"] - - def sort_dict(in_dict): - in_dict = sorted(in_dict, key=lambda x: x["task_id"]) - out = [] - for d in in_dict: - n = {k: d[k] for k in sorted(d, reverse=True)} - out.append(n) - return out - - expected = [ - { - "child_states": {"None": 1}, - "task_id": "mapped_task_2", - "max_end_date": None, - "min_start_date": None, - "state": None, - }, - { - "child_states": {"running": 1}, - "max_end_date": "2024-12-30T01:02:03Z", - "min_start_date": "2024-12-30T01:00:00Z", - "state": "running", - "task_id": "mapped_task_group", - }, - { - "state": "running", - "task_id": "mapped_task_group.subtask", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "state": "success", - "task_id": "task", - "child_states": None, - "max_end_date": None, - "min_start_date": None, - }, - { - "child_states": {"None": 2}, - "task_id": "task_group", - "max_end_date": None, - "min_start_date": None, - "state": None, - }, - { - "child_states": {"None": 1}, - "task_id": "task_group.inner_task_group", - "max_end_date": None, - "min_start_date": None, - "state": None, - }, - { - "child_states": {"None": 2}, - "task_id": "task_group.inner_task_group.inner_task_group_sub_task", - "max_end_date": None, - "min_start_date": None, - "state": None, - }, - { - "child_states": {"None": 4}, - "task_id": "task_group.mapped_task", - "max_end_date": None, - "min_start_date": None, - "state": None, - }, - ] - expected = sort_dict(expected) - actual = sort_dict(actual) - assert actual == expected + actual = response.json() + + assert actual["dag_id"] == "test_dag" + assert actual["run_id"] == "run_2" + assert "task_instances" in actual + + task_instances = actual["task_instances"] + assert len(task_instances) == 8