Skip to content

[v3-0-test] Unify connection not found exceptions between AF2 and AF3 (#52968) #53093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow-core/src/airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,7 @@ def get_connection_from_secrets(cls, conn_id: str) -> Connection:
return conn
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
log.debug("Unable to retrieve connection from MetastoreBackend using Task SDK")
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") from None
raise

# check cache first
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def dag_in_a_fn():
assert result is not None
assert result.import_errors != {}
if result.import_errors:
assert "CONNECTION_NOT_FOUND" in next(iter(result.import_errors.values()))
assert "The conn_id `my_conn` isn't defined" in next(iter(result.import_errors.values()))

def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_client):
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
Expand Down
10 changes: 8 additions & 2 deletions task-sdk/src/airflow/sdk/definitions/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

import attrs

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -139,7 +140,12 @@ def get_hook(self, *, hook_params=None):
def get(cls, conn_id: str) -> Any:
from airflow.sdk.execution_time.context import _get_connection

return _get_connection(conn_id)
try:
return _get_connection(conn_id)
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") from None
raise

@property
def extra_dejson(self) -> dict:
Expand Down
4 changes: 3 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ class ConnectionAccessor:
"""Wrapper to access Connection entries in template."""

def __getattr__(self, conn_id: str) -> Any:
return _get_connection(conn_id)
from airflow.sdk.definitions.connection import Connection

return Connection.get(conn_id)

def __repr__(self) -> str:
return "<ConnectionAccessor (dynamic access)>"
Expand Down
12 changes: 10 additions & 2 deletions task-sdk/tests/task_sdk/definitions/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import pytest

from airflow.configuration import initialize_secrets_backends
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.sdk import Connection
from airflow.sdk.execution_time.comms import ConnectionResult
from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS

from tests_common.test_utils.config import conf_vars
Expand Down Expand Up @@ -121,6 +122,13 @@ def test_conn_get(self, mock_supervisor_comms):
extra=None,
)

def test_conn_get_not_found(self, mock_supervisor_comms):
error_response = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND)
mock_supervisor_comms.send.return_value = error_response

with pytest.raises(AirflowNotFoundException, match="The conn_id `mysql_conn` isn't defined"):
_ = Connection.get(conn_id="mysql_conn")


class TestConnectionsFromSecrets:
def test_get_connection_secrets_backend(self, mock_supervisor_comms, tmp_path):
Expand Down