Skip to content

ImportError: cannot import name 'SUPERVISOR_COMMS' with dag.test() #51816

Open
@opeida

Description

@opeida

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The exception occurred when the DAG ran with dag.test() attempted to retrieve a variable from the API server. Some similar issues have been opened (#48554, #51062, #51316). The PRs provided as a solution (#50300, #50419) were included in 3.0.2 but did not fix the problem.

Exception has occurred: ImportError
cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
  File "/workspaces/airflow/test.py", line 7, in <module>
    x = Variable.get("my_variable")
        ^^^^^^^^^^^^^^^^^^^^^^^^
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)

What you think should happen instead?

The variable should have been successfully retrieved without exceptions.

How to reproduce

  1. Set the variable: airflow variables set my_variable my_value
  2. Run DAG:
import logging

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable

x = Variable.get("my_variable")

def my_function(my_var: str) -> None:
    logging.getLogger(__name__).info(my_var)

with DAG("test_dag") as dag:

    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(
        task_id="py_func",
        python_callable=my_function,
        op_kwargs={
            "my_var": x
        }
    )

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

if __name__ == "__main__":
    dag.test()

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

Extended image based on apache/airflow:slim-3.0.2-python3.12

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

  • I agree to follow this project's Code of Conduct

Activity

added this to the Airflow 3.0.3 milestone on Jun 17, 2025
amoghrajesh

amoghrajesh commented on Jun 17, 2025

@amoghrajesh
Contributor

CC: @kaxil we should pick this up for 3.0.3 if its actually an issue, haven't triaged it

removed
needs-triagelabel for new issues that we didn't triage yet
on Jun 17, 2025
kaxil

kaxil commented on Jun 17, 2025

@kaxil
Member

Yeah it is in an issue. The problem is the dag is already parsed before the dag.test code can be called which would assign task_runner.SUPERVISOR_COMMS = InProcessSupervisorComms

ashb

ashb commented on Jun 17, 2025

@ashb
Member

Assuming we can detect the case when this is being imported from doing python mydag.py (which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?

I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?

kaxil

kaxil commented on Jun 17, 2025

@kaxil
Member

Assuming we can detect the case when this is being imported from doing python mydag.py (which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?

I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?

Since we allow that in the actual dag parsing models (as it goes via DAG processor -> Supervisor comms -> Variable), we should do the same for dag.test.

Now regarding the security model, currently Task Exec API is tied with Task Identity so this becomes a no-go. But that's a breaking change. (Unless ofcourse we want to include DAG file identity too somehow).

kaxil

kaxil commented on Jun 17, 2025

@kaxil
Member

@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context). In your case, you can rewrite the dag as follows:

import logging

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator


def my_function(my_var: str) -> None:
    logging.getLogger(__name__).info(my_var)

with DAG("test_dag") as dag:

    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(
        task_id="py_func",
        python_callable=my_function,
        op_kwargs={
            "my_var": "{{ var.value.my_variable }}"
        }
    )

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

if __name__ == "__main__":
    dag.test()
ashb

ashb commented on Jun 17, 2025

@ashb
Member

So: detecting when we have top-level dag code, at least in the python mydagfile.py case:

def __getattr__(name):
    if name == "SUPERVISOR_COMMS" and "__main__" in sys.modules:
        import traceback

        frames = [
            frame
            for (frame, lnum) in traceback.walk_stack(None)
            if not frame.f_code.co_filename.startswith("<frozen importlib.")
        ]
        if sys.modules["__main__"].__file__ == frames[-1].f_code.co_filename:
            raise RuntimeError("Top level API access here")

    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

(Put this in airflow/sdk/execution_time/task_runner.py. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)

So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?

opeida

opeida commented on Jun 17, 2025

@opeida
Author

@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context)

@kaxil are there workarounds to eliminate the use of top-level variables for dynamic DAG generation?
We pull data from numerous accounts of a third-party provider. The accounts variable can change dynamically and stores sensitive information including API keys. In all other cases, we utilize Jinja templates within Task Context following Airflow's best practices.

provider_name = "<provider_name>"
ALL_ACCOUNTS = Variable.get(f"{provider_name}_accounts")

for account in ALL_ACCOUNTS:
    with DAG(
        f"{provider_name}.{account.get('name')}",
        default_args=default_args,
        schedule="10 * * * *",
        catchup=False,
        max_active_runs=1
    ) as dag:
        # DAG operators
kaxil

kaxil commented on Jun 17, 2025

@kaxil
Member

You can just do following in your case:

provider_name = "<provider_name>"

def my_function(my_var: str) -> None:
    logging.getLogger(__name__).info(my_var)

with DAG("test_dag") as dag:

    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(
        task_id="py_func",
        python_callable=my_function,
        op_kwargs={
            "my_var": "{{ var.value." + provider_name + "_accounts }}"
        }
    )

    end = EmptyOperator(task_id="end")

    start >> py_func >> end
kaxil

kaxil commented on Jun 17, 2025

@kaxil
Member

So: detecting when we have top-level dag code, at least in the python mydagfile.py case:

def getattr(name):
if name == "SUPERVISOR_COMMS" and "main" in sys.modules:
import traceback

    frames = [
        frame
        for (frame, lnum) in traceback.walk_stack(None)
        if not frame.f_code.co_filename.startswith("<frozen importlib.")
    ]
    if sys.modules["__main__"].__file__ == frames[-1].f_code.co_filename:
        raise RuntimeError("Top level API access here")

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

(Put this in airflow/sdk/execution_time/task_runner.py. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)

aah 💡 .

So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?

Unsure right now -- need to think it through

opeida

opeida commented on Jun 17, 2025

@opeida
Author

py_func = PythonOperator(
task_id="py_func",
python_callable=my_function,
op_kwargs={
"my_var": "{{ var.value." + provider_name + "_accounts }}"
}
)

@kaxil the provided DAG reproduces the issue only and does not reflect our actual case. We need the ability to trigger a certain account, gather metrics to StatsD for each account, and utilize the additional flexibility that dynamic DAG generation offers.

Thank you for your reply. I look forward to testing the fix if one is provided.
Feel free to ping me if you need any feedback from the user perspective.

UPD: import Variable from airflow.models instead of airflow.sdk fixes the problem but doesn't seem like a best practice solution.

ashb

ashb commented on Jun 18, 2025

@ashb
Member

UPD: import Variable from airflow.models instead of airflow.sdk fixes the problem but doesn't seem like a best practice solution.

Yeah, that still works as it's falling back to the direct DB access based approach

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @ashb@kaxil@opeida@amoghrajesh

        Issue actions

          ImportError: cannot import name 'SUPERVISOR_COMMS' with dag.test() · Issue #51816 · apache/airflow