File tree Expand file tree Collapse file tree 1 file changed +29
-0
lines changed Expand file tree Collapse file tree 1 file changed +29
-0
lines changed Original file line number Diff line number Diff line change
1
+ import asyncio
2
+ from contextlib import suppress
3
+ from time import time
4
+
5
+ import pytest
6
+
7
+ from dask .distributed import Client
8
+
9
+ from dask_jobqueue .runner import AsyncCommWorld , AsyncRunner
10
+
11
+
12
+ @pytest .mark .asyncio
13
+ @pytest .mark .timeout (10 )
14
+ async def test_runner ():
15
+ commworld = AsyncCommWorld ()
16
+
17
+ async def run_code (commworld ):
18
+ with suppress (SystemExit ):
19
+ async with AsyncRunner (commworld , asynchronous = True ) as runner :
20
+ async with Client (runner , asynchronous = True ) as c :
21
+ start = time ()
22
+ while len (c .scheduler_info ()["workers" ]) != 2 :
23
+ assert time () < start + 10
24
+ await asyncio .sleep (0.2 )
25
+
26
+ assert await c .submit (lambda x : x + 1 , 10 ).result () == 11
27
+ assert await c .submit (lambda x : x + 1 , 20 ).result () == 21
28
+
29
+ await asyncio .gather (* [run_code (commworld ) for _ in range (4 )])
You can’t perform that action at this time.
0 commit comments