Open
Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
The exception occurred when the DAG ran with dag.test()
attempted to retrieve a variable from the API server. Some similar issues have been opened (#48554, #51062, #51316). The PRs provided as a solution (#50300, #50419) were included in 3.0.2 but did not fix the problem.
Exception has occurred: ImportError
cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
File "/workspaces/airflow/test.py", line 7, in <module>
x = Variable.get("my_variable")
^^^^^^^^^^^^^^^^^^^^^^^^
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
What you think should happen instead?
The variable should have been successfully retrieved without exceptions.
How to reproduce
- Set the variable:
airflow variables set my_variable my_value
- Run DAG:
import logging
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable
x = Variable.get("my_variable")
def my_function(my_var: str) -> None:
logging.getLogger(__name__).info(my_var)
with DAG("test_dag") as dag:
start = EmptyOperator(task_id="start")
py_func = PythonOperator(
task_id="py_func",
python_callable=my_function,
op_kwargs={
"my_var": x
}
)
end = EmptyOperator(task_id="end")
start >> py_func >> end
if __name__ == "__main__":
dag.test()
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
Extended image based on apache/airflow:slim-3.0.2-python3.12
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
Type
Projects
Relationships
Development
No branches or pull requests
Activity
amoghrajesh commentedon Jun 17, 2025
CC: @kaxil we should pick this up for 3.0.3 if its actually an issue, haven't triaged it
kaxil commentedon Jun 17, 2025
Yeah it is in an issue. The problem is the dag is already parsed before the
dag.test
code can be called which would assigntask_runner.SUPERVISOR_COMMS = InProcessSupervisorComms
ashb commentedon Jun 17, 2025
Assuming we can detect the case when this is being imported from doing
python mydag.py
(which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?
kaxil commentedon Jun 17, 2025
Since we allow that in the actual dag parsing models (as it goes via DAG processor -> Supervisor comms -> Variable), we should do the same for dag.test.
Now regarding the security model, currently Task Exec API is tied with Task Identity so this becomes a no-go. But that's a breaking change. (Unless ofcourse we want to include DAG file identity too somehow).
kaxil commentedon Jun 17, 2025
@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context). In your case, you can rewrite the dag as follows:
ashb commentedon Jun 17, 2025
So: detecting when we have top-level dag code, at least in the
python mydagfile.py
case:(Put this in
airflow/sdk/execution_time/task_runner.py
. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?
opeida commentedon Jun 17, 2025
@kaxil are there workarounds to eliminate the use of top-level variables for dynamic DAG generation?
We pull data from numerous accounts of a third-party provider. The accounts variable can change dynamically and stores sensitive information including API keys. In all other cases, we utilize Jinja templates within Task Context following Airflow's best practices.
kaxil commentedon Jun 17, 2025
You can just do following in your case:
kaxil commentedon Jun 17, 2025
aah 💡 .
Unsure right now -- need to think it through
opeida commentedon Jun 17, 2025
@kaxil the provided DAG reproduces the issue only and does not reflect our actual case. We need the ability to trigger a certain account, gather metrics to StatsD for each account, and utilize the additional flexibility that dynamic DAG generation offers.
Thank you for your reply. I look forward to testing the fix if one is provided.
Feel free to ping me if you need any feedback from the user perspective.
UPD: import
Variable
fromairflow.models
instead ofairflow.sdk
fixes the problem but doesn't seem like a best practice solution.ashb commentedon Jun 18, 2025
Yeah, that still works as it's falling back to the direct DB access based approach