Skip to content

Remove more usages of legacy grid data endpoint #52302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class LightGridTaskInstanceSummary(BaseModel):

task_id: str
state: TaskInstanceState | None
child_states: dict[TaskInstanceState, int] | None
min_start_date: datetime | None
max_end_date: datetime | None


class GridDAGRunwithTIs(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2005,10 +2005,34 @@ components:
anyOf:
- $ref: '#/components/schemas/TaskInstanceState'
- type: 'null'
child_states:
anyOf:
- additionalProperties:
type: integer
propertyNames:
$ref: '#/components/schemas/TaskInstanceState'
type: object
- type: 'null'
title: Child States
min_start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Min Start Date
max_end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Max End Date
type: object
required:
- task_id
- state
- child_states
- min_start_date
- max_end_date
title: LightGridTaskInstanceSummary
description: Task Instance Summary model for the Grid UI.
MenuItem:
Expand Down
40 changes: 28 additions & 12 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import collections
import itertools
from typing import Annotated
from typing import TYPE_CHECKING, Annotated

import structlog
from fastapi import Depends, HTTPException, status
Expand Down Expand Up @@ -507,6 +507,8 @@ def get_grid_ti_summaries(
TaskInstance.task_id,
TaskInstance.state,
TaskInstance.dag_version_id,
TaskInstance.start_date,
TaskInstance.end_date,
)
.where(TaskInstance.dag_id == dag_id)
.where(
Expand All @@ -519,29 +521,43 @@ def get_grid_ti_summaries(
return_total_entries=False,
)
task_instances = list(session.execute(tis_of_dag_runs))
task_id_states = collections.defaultdict(list)
if not task_instances:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id} run_id={run_id}"
)
ti_details = collections.defaultdict(list)
for ti in task_instances:
task_id_states[ti.task_id].append(ti.state)

ti_details[ti.task_id].append(
{
"state": ti.state,
"start_date": ti.start_date,
"end_date": ti.end_date,
}
)
serdag = _get_serdag(
dag_id=dag_id,
dag_version_id=task_instances[0].dag_version_id,
session=session,
)
if not serdag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")
tis = list(
_find_aggregates(
if TYPE_CHECKING:
assert serdag

def get_node_sumaries():
for node in _find_aggregates(
node=serdag.dag.task_group,
parent_node=None,
ti_states=task_id_states,
)
)
ti_details=ti_details,
):
if node["type"] == "task":
node["child_states"] = None
node["min_start_date"] = None
node["max_end_date"] = None
yield node

return { # type: ignore[return-value]
"run_id": run_id,
"dag_id": dag_id,
"task_instances": list(tis),
"task_instances": list(get_node_sumaries()),
}


Expand Down
44 changes: 34 additions & 10 deletions airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ def fill_task_instance_summaries(

:param grouped_task_instances: Grouped Task Instances
:param task_instance_summaries_to_fill: Task Instance Summaries to fill
:param task_node_map: Task Node Map
:param session: Session

:return: None
Expand Down Expand Up @@ -323,49 +322,74 @@ def agg_state(states):
for state in state_priority:
if state in states:
return state
return "no_status"
return None


def _get_aggs_for_node(detail):
states = [x["state"] for x in detail if x["state"] is not None]
try:
min_start_date = min(x["start_date"] for x in detail if x["start_date"])
except ValueError:
min_start_date = None
try:
max_end_date = max(x["end_date"] for x in detail if x["end_date"])
except ValueError:
max_end_date = None
return {
"state": agg_state(states),
"min_start_date": min_start_date,
"max_end_date": max_end_date,
"child_states": dict(Counter(states)),
}


def _find_aggregates(
node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap,
parent_node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap | None,
ti_states: dict[str, list[str]],
ti_details: dict[str, list],
) -> Iterable[dict]:
"""Recursively fill the Task Group Map."""
node_id = node.node_id
parent_id = parent_node.node_id if parent_node else None
details = ti_details[node_id]

if node is None:
return

if isinstance(node, MappedOperator):
yield {
"task_id": node_id,
"type": "mapped_task",
"parent_id": parent_id,
"state": agg_state(ti_states[node_id]),
**_get_aggs_for_node(details),
}

return
if isinstance(node, TaskGroup):
states = []
children = []
for child in get_task_group_children_getter()(node):
for child_node in _find_aggregates(node=child, parent_node=node, ti_states=ti_states):
states.append(child_node["state"])
for child_node in _find_aggregates(node=child, parent_node=node, ti_details=ti_details):
if child_node["parent_id"] == node_id:
children.append(
{
"state": child_node["state"],
"start_date": child_node["min_start_date"],
"end_date": child_node["max_end_date"],
}
)
yield child_node
if node_id:
yield {
"task_id": node_id,
"type": "group",
"parent_id": parent_id,
"state": agg_state(states),
**_get_aggs_for_node(children),
}
return
if isinstance(node, BaseOperator):
yield {
"task_id": node_id,
"type": "task",
"parent_id": parent_id,
"state": agg_state(ti_states[node_id]),
**_get_aggs_for_node(details),
}
return
Original file line number Diff line number Diff line change
Expand Up @@ -7135,10 +7135,51 @@ export const $LightGridTaskInstanceSummary = {
type: 'null'
}
]
},
child_states: {
anyOf: [
{
additionalProperties: {
type: 'integer'
},
propertyNames: {
'$ref': '#/components/schemas/TaskInstanceState'
},
type: 'object'
},
{
type: 'null'
}
],
title: 'Child States'
},
min_start_date: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Min Start Date'
},
max_end_date: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Max End Date'
}
},
type: 'object',
required: ['task_id', 'state'],
required: ['task_id', 'state', 'child_states', 'min_start_date', 'max_end_date'],
title: 'LightGridTaskInstanceSummary',
description: 'Task Instance Summary model for the Grid UI.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,11 @@ export type LatestRunResponse = {
export type LightGridTaskInstanceSummary = {
task_id: string;
state: TaskInstanceState | null;
child_states: {
[key: string]: (number);
} | null;
min_start_date: string | null;
max_end_date: string | null;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export const Bar = ({ max, nodes, run }: Props) => {
const isSelected = runId === run.run_id;

const search = searchParams.toString();
const { data: gridTISummaries } = useGridTiSummaries(run);
const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId: run.run_id, state: run.state });

return (
<Box
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,63 +20,25 @@ import { ReactFlowProvider } from "@xyflow/react";
import { MdOutlineTask } from "react-icons/md";
import { useParams } from "react-router-dom";

import {
useDagRunServiceGetDagRun,
useDagServiceGetDagDetails,
useGridServiceGridData,
} from "openapi/queries";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
import { isStatePending, useAutoRefresh } from "src/utils";

import { Header } from "./Header";

export const GroupTaskInstance = () => {
const { dagId = "", groupId = "", runId = "" } = useParams();
const refetchInterval = useAutoRefresh({ dagId });

const {
data: dag,
error: dagError,
isLoading: isDagLoading,
} = useDagServiceGetDagDetails({
dagId,
});

const { data: dagRun } = useDagRunServiceGetDagRun(
{
dagId,
dagRunId: runId,
},
undefined,
{ enabled: runId !== "" },
);
const { dagId = "", runId = "", taskId = "" } = useParams();

// Filter grid data to get only a single dag run
const { data, error, isLoading } = useGridServiceGridData(
{
dagId,
limit: 1,
offset: 0,
runAfterGte: dagRun?.run_after,
runAfterLte: dagRun?.run_after,
},
undefined,
{
enabled: dagRun !== undefined,
refetchInterval: (query) =>
query.state.data?.dag_runs.some((dr) => isStatePending(dr.state)) && refetchInterval,
},
);
const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
const taskInstance = gridTISummaries?.task_instances.find((ti) => ti.task_id === taskId);

const taskInstance = data?.dag_runs
.find((dr) => dr.dag_run_id === runId)
?.task_instances.find((ti) => ti.task_id === groupId);
const refetchInterval = useAutoRefresh({ dagId });

const tabs = [{ icon: <MdOutlineTask />, label: "Task Instances", value: "" }];

return (
<ReactFlowProvider>
<DetailsLayout dag={dag} error={error ?? dagError} isLoading={isLoading || isDagLoading} tabs={tabs}>
<DetailsLayout tabs={tabs}>
{taskInstance === undefined ? undefined : (
<Header
isRefreshing={Boolean(isStatePending(taskInstance.state) && Boolean(refetchInterval))}
Expand Down
Loading