Skip to content

Commit 467e80d

Browse files
authored
Merge branch 'main' into deserialize-xcom-native
2 parents 429cc3c + 0911676 commit 467e80d

File tree

64 files changed

+2024
-1044
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2024
-1044
lines changed

.github/CODEOWNERS

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@
3535
# UI
3636
/airflow-core/src/airflow/ui/ @bbovenzi @pierrejeambrun @ryanahamilton @jscheffl @shubhamraj-git
3737

38-
# Translations
38+
# Translations (i18n)
3939
airflow-core/src/airflow/ui/src/i18n/locales/de/ @jscheffl
4040
airflow-core/src/airflow/ui/src/i18n/locales/zh-TW/ @Lee-W
4141
airflow-core/src/airflow/ui/src/i18n/locales/nl/ @BasPH # not codeowner but engaged: @DjVinnii
4242
airflow-core/src/airflow/ui/src/i18n/locales/pl/ @potiuk @mobuchowski # not codeowner but engaged: @kacpermuda
43+
airflow-core/src/airflow/ui/src/i18n/locales/he/ @eladkal @shahar1 @romsharon98
4344

4445
# Security/Permissions
4546
/airflow-core/src/airflow/security/permissions.py @vincbeck

.github/boring-cyborg.yml

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,6 @@ labelPRBasedOnFilePath:
111111
provider:common-sql:
112112
- providers/common/sql/**
113113

114-
provider:standard:
115-
- providers/standard/**
116-
117114
provider:databricks:
118115
- providers/databricks/**
119116

@@ -180,6 +177,9 @@ labelPRBasedOnFilePath:
180177
provider:jenkins:
181178
- providers/jenkins/**
182179

180+
provider:keycloak:
181+
- providers/keycloak/**
182+
183183
provider:microsoft-azure:
184184
- providers/microsoft/azure/**
185185

@@ -219,7 +219,7 @@ labelPRBasedOnFilePath:
219219
provider:opsgenie:
220220
- providers/opsgenie/**
221221

222-
provider:Oracle:
222+
provider:oracle:
223223
- providers/oracle/**
224224

225225
provider:pagerduty:
@@ -279,6 +279,9 @@ labelPRBasedOnFilePath:
279279
provider:ssh:
280280
- providers/ssh/**
281281

282+
provider:standard:
283+
- providers/standard/**
284+
282285
provider:tableau:
283286
- providers/tableau/**
284287

@@ -344,6 +347,30 @@ labelPRBasedOnFilePath:
344347
- airflow-core/docs/ui.rst
345348
- airflow-core/src/airflow/ui/**/*
346349

350+
area:translations:
351+
- airflow-core/src/airflow/ui/src/i18n/**/*
352+
353+
translation:default:
354+
- airflow-core/src/airflow/ui/src/i18n/locales/en/*
355+
356+
translation:de:
357+
- airflow-core/src/airflow/ui/src/i18n/locales/de/*
358+
359+
translation:he:
360+
- airflow-core/src/airflow/ui/src/i18n/locales/he/*
361+
362+
translation:ko:
363+
- airflow-core/src/airflow/ui/src/i18n/locales/ko/*
364+
365+
translation:nl:
366+
- airflow-core/src/airflow/ui/src/i18n/locales/nl/*
367+
368+
translation:pl:
369+
- airflow-core/src/airflow/ui/src/i18n/locales/pl/*
370+
371+
translation:zh-TW:
372+
- airflow-core/src/airflow/ui/src/i18n/locales/zh-TW/*
373+
347374
area:CLI:
348375
- airflow-core/src/airflow/cli/**/*.py
349376
- airflow-core/tests/unit/cli/**/*.py

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,7 @@ repos:
629629
pass_filenames: true
630630
exclude: >
631631
(?x)
632+
^airflow-core/src/airflow/ui/src/i18n/config\.ts$|
632633
^airflow-core/src/airflow/ui/openapi-gen/|
633634
^airflow-core/src/airflow/ui/src/i18n/locales/de/README\.md$|
634635
^airflow-core/src/airflow/cli/commands/local_commands/fastapi_api_command\.py$|

airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@
6565
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
6666
from airflow.api_fastapi.core_api.services.public.task_instances import (
6767
BulkTaskInstanceService,
68+
_patch_task_instance_note,
69+
_patch_task_instance_state,
6870
_patch_ti_validate_request,
6971
)
7072
from airflow.api_fastapi.logging.decorators import action_logging
7173
from airflow.exceptions import TaskNotFound
72-
from airflow.listeners.listener import get_listener_manager
7374
from airflow.models import Base, DagRun
7475
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
7576
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
@@ -848,46 +849,22 @@ def patch_task_instance(
848849

849850
for key, _ in data.items():
850851
if key == "new_state":
851-
tis = dag.set_task_instance_state(
852+
_patch_task_instance_state(
852853
task_id=task_id,
853-
run_id=dag_run_id,
854-
map_indexes=[map_index] if map_index is not None else None,
855-
state=data["new_state"],
856-
upstream=body.include_upstream,
857-
downstream=body.include_downstream,
858-
future=body.include_future,
859-
past=body.include_past,
860-
commit=True,
854+
dag_run_id=dag_run_id,
855+
dag=dag,
856+
task_instance_body=body,
857+
data=data,
861858
session=session,
862859
)
863-
if not tis:
864-
raise HTTPException(
865-
status.HTTP_409_CONFLICT, f"Task id {task_id} is already in {data['new_state']} state"
866-
)
867-
868-
for ti in tis:
869-
try:
870-
if data["new_state"] == TaskInstanceState.SUCCESS:
871-
get_listener_manager().hook.on_task_instance_success(
872-
previous_state=None, task_instance=ti
873-
)
874-
elif data["new_state"] == TaskInstanceState.FAILED:
875-
get_listener_manager().hook.on_task_instance_failed(
876-
previous_state=None,
877-
task_instance=ti,
878-
error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.",
879-
)
880-
except Exception:
881-
log.exception("error calling listener")
882860

883861
elif key == "note":
884-
for ti in tis:
885-
if update_mask or body.note is not None:
886-
if ti.task_instance_note is None:
887-
ti.note = (body.note, user.get_id())
888-
else:
889-
ti.task_instance_note.content = body.note
890-
ti.task_instance_note.user_id = user.get_id()
862+
_patch_task_instance_note(
863+
task_instance_body=body,
864+
tis=tis,
865+
user=user,
866+
update_mask=update_mask,
867+
)
891868

892869
return TaskInstanceCollectionResponse(
893870
task_instances=[

airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py

Lines changed: 65 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,64 @@ def _patch_ti_validate_request(
9494
return dag, list(tis), body.model_dump(include=fields_to_update, by_alias=True)
9595

9696

97+
def _patch_task_instance_state(
98+
task_id: str,
99+
dag_run_id: str,
100+
dag: DAG,
101+
task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody,
102+
data: dict,
103+
session: Session,
104+
) -> None:
105+
map_index = getattr(task_instance_body, "map_index", None)
106+
map_indexes = None if map_index is None else [map_index]
107+
108+
updated_tis = dag.set_task_instance_state(
109+
task_id=task_id,
110+
run_id=dag_run_id,
111+
map_indexes=map_indexes,
112+
state=data["new_state"],
113+
upstream=task_instance_body.include_upstream,
114+
downstream=task_instance_body.include_downstream,
115+
future=task_instance_body.include_future,
116+
past=task_instance_body.include_past,
117+
commit=True,
118+
session=session,
119+
)
120+
if not updated_tis:
121+
raise HTTPException(
122+
status.HTTP_409_CONFLICT,
123+
f"Task id {task_id} is already in {data['new_state']} state",
124+
)
125+
126+
for ti in updated_tis:
127+
try:
128+
if data["new_state"] == TaskInstanceState.SUCCESS:
129+
get_listener_manager().hook.on_task_instance_success(previous_state=None, task_instance=ti)
130+
elif data["new_state"] == TaskInstanceState.FAILED:
131+
get_listener_manager().hook.on_task_instance_failed(
132+
previous_state=None,
133+
task_instance=ti,
134+
error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.",
135+
)
136+
except Exception:
137+
log.exception("error calling listener")
138+
139+
140+
def _patch_task_instance_note(
141+
task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody,
142+
tis: list[TI],
143+
user: GetUserDep,
144+
update_mask: list[str] | None = Query(None),
145+
) -> None:
146+
for ti in tis:
147+
if update_mask or task_instance_body.note is not None:
148+
if ti.task_instance_note is None:
149+
ti.note = (task_instance_body.note, user.get_id())
150+
else:
151+
ti.task_instance_note.content = task_instance_body.note
152+
ti.task_instance_note.user_id = user.get_id()
153+
154+
97155
class BulkTaskInstanceService(BulkService[BulkTaskInstanceBody]):
98156
"""Service for handling bulk operations on task instances."""
99157

@@ -134,55 +192,6 @@ def categorize_task_instances(
134192
not_found_task_keys = {(task_id, map_index) for task_id, map_index in task_ids} - matched_task_keys
135193
return task_instances_map, matched_task_keys, not_found_task_keys
136194

137-
def _patch_task_instance_state(
138-
self,
139-
dag: DAG,
140-
task_instance_body: BulkTaskInstanceBody,
141-
data: dict,
142-
) -> None:
143-
map_indexes = None if task_instance_body.map_index is None else [task_instance_body.map_index]
144-
145-
updated_tis = dag.set_task_instance_state(
146-
task_id=task_instance_body.task_id,
147-
run_id=self.dag_run_id,
148-
map_indexes=map_indexes,
149-
state=data["new_state"],
150-
upstream=task_instance_body.include_upstream,
151-
downstream=task_instance_body.include_downstream,
152-
future=task_instance_body.include_future,
153-
past=task_instance_body.include_past,
154-
commit=True,
155-
session=self.session,
156-
)
157-
if not updated_tis:
158-
raise HTTPException(
159-
status.HTTP_409_CONFLICT,
160-
f"Task id {task_instance_body.task_id} is already in {data['new_state']} state",
161-
)
162-
for ti in updated_tis:
163-
try:
164-
if data["new_state"] == TaskInstanceState.SUCCESS:
165-
get_listener_manager().hook.on_task_instance_success(
166-
previous_state=None, task_instance=ti
167-
)
168-
elif data["new_state"] == TaskInstanceState.FAILED:
169-
get_listener_manager().hook.on_task_instance_failed(
170-
previous_state=None,
171-
task_instance=ti,
172-
error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.",
173-
)
174-
except Exception:
175-
log.exception("error calling listener")
176-
177-
def _patch_task_instance_note(self, task_instance_body: BulkTaskInstanceBody, tis: list[TI]) -> None:
178-
for ti in tis:
179-
if task_instance_body.note is not None:
180-
if ti.task_instance_note is None:
181-
ti.note = (task_instance_body.note, self.user.get_id())
182-
else:
183-
ti.task_instance_note.content = task_instance_body.note
184-
ti.task_instance_note.user_id = self.user.get_id()
185-
186195
def handle_bulk_create(
187196
self, action: BulkCreateAction[BulkTaskInstanceBody], results: BulkActionResponse
188197
) -> None:
@@ -232,13 +241,18 @@ def handle_bulk_update(
232241

233242
for key, _ in data.items():
234243
if key == "new_state":
235-
self._patch_task_instance_state(
244+
_patch_task_instance_state(
245+
task_id=task_instance_body.task_id,
246+
dag_run_id=self.dag_run_id,
236247
dag=dag,
237248
task_instance_body=task_instance_body,
249+
session=self.session,
238250
data=data,
239251
)
240252
elif key == "note":
241-
self._patch_task_instance_note(task_instance_body=task_instance_body, tis=tis)
253+
_patch_task_instance_note(
254+
task_instance_body=task_instance_body, tis=tis, user=self.user
255+
)
242256

243257
results.success.append(task_instance_body.task_id)
244258
except ValidationError as e:

airflow-core/src/airflow/cli/cli_config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,15 @@ def string_lower_type(val):
483483
type=positive_int(allow_zero=False),
484484
help="Wait time between retries in seconds",
485485
)
486+
ARG_DB_BATCH_SIZE = Arg(
487+
("--batch-size",),
488+
default=None,
489+
type=positive_int(allow_zero=False),
490+
help=(
491+
"Maximum number of rows to delete or archive in a single transaction.\n"
492+
"Lower values reduce long-running locks but increase the number of batches."
493+
),
494+
)
486495

487496
# pool
488497
ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
@@ -1452,6 +1461,7 @@ class GroupCommand(NamedTuple):
14521461
ARG_VERBOSE,
14531462
ARG_YES,
14541463
ARG_DB_SKIP_ARCHIVE,
1464+
ARG_DB_BATCH_SIZE,
14551465
),
14561466
),
14571467
ActionCommand(

airflow-core/src/airflow/cli/commands/db_command.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ def cleanup_tables(args):
283283
verbose=args.verbose,
284284
confirm=not args.yes,
285285
skip_archive=args.skip_archive,
286+
batch_size=args.batch_size,
286287
)
287288

288289

airflow-core/src/airflow/ui/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<!doctype html>
2-
<html lang="en" style="height: 100%">
2+
<html style="height: 100%">
33
<head>
44
<meta charset="UTF-8" />
55
<base href="{{ backend_server_base_url }}" />

airflow-core/src/airflow/ui/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
},
1818
"dependencies": {
1919
"@chakra-ui/anatomy": "^2.3.4",
20-
"@chakra-ui/react": "^3.17.0",
20+
"@chakra-ui/react": "^3.20.0",
2121
"@codemirror/lang-json": "^6.0.1",
2222
"@emotion/react": "^11.14.0",
2323
"@tanstack/react-query": "^5.75.1",

0 commit comments

Comments
 (0)