@@ -28,23 +28,24 @@ def mock_config():
28
28
29
29
30
30
@pytest .fixture
31
- def subprocess_manager (mock_config ):
31
+ def subprocess_manager (mock_config ): # pylint: disable=redefined-outer-name
32
32
''' Create SubprocessManager for testing '''
33
33
with patch ('importlib.import_module' ):
34
34
manager = nowplaying .subprocesses .SubprocessManager (config = mock_config , testmode = True )
35
35
# Mock the process modules
36
- for name in manager .processes :
37
- manager . processes [ name ] ['module' ] = MagicMock ()
36
+ for _ , process_info in manager .processes . items () :
37
+ process_info ['module' ] = MagicMock ()
38
38
yield manager
39
39
40
40
41
- def test_subprocess_manager_init (mock_config ):
41
+ def test_subprocess_manager_init (mock_config ): # pylint: disable=redefined-outer-name
42
42
''' Test SubprocessManager initialization '''
43
- with patch ('importlib.import_module' ) as mock_import :
43
+ with patch ('importlib.import_module' ):
44
44
manager = nowplaying .subprocesses .SubprocessManager (config = mock_config , testmode = True )
45
45
46
46
# Should import all expected modules
47
- expected_processes = ['trackpoll' , 'obsws' , 'twitchbot' , 'discordbot' , 'webserver' , 'kickbot' ]
47
+ expected_processes = ['trackpoll' , 'obsws' , 'twitchbot' , 'discordbot' ,
48
+ 'webserver' , 'kickbot' ]
48
49
assert len (manager .processes ) == len (expected_processes )
49
50
50
51
for process_name in expected_processes :
@@ -54,7 +55,7 @@ def test_subprocess_manager_init(mock_config):
54
55
assert 'stopevent' in manager .processes [process_name ]
55
56
56
57
57
- def test_subprocess_manager_beam_mode (mock_config ):
58
+ def test_subprocess_manager_beam_mode (mock_config ): # pylint: disable=redefined-outer-name
58
59
''' Test SubprocessManager in beam mode '''
59
60
mock_config .cparser .value .side_effect = lambda key , ** kwargs : {
60
61
'control/beam' : True ,
@@ -70,7 +71,7 @@ def test_subprocess_manager_beam_mode(mock_config):
70
71
assert process_name in manager .processes
71
72
72
73
73
- def test_start_process_with_conditions (subprocess_manager ):
74
+ def test_start_process_with_conditions (subprocess_manager ): # pylint: disable=redefined-outer-name
74
75
''' Test conditional process starting '''
75
76
manager = subprocess_manager
76
77
@@ -82,7 +83,7 @@ def test_start_process_with_conditions(subprocess_manager):
82
83
mock_start .assert_called_once_with ('twitchbot' )
83
84
84
85
85
- def test_start_process_disabled (subprocess_manager ):
86
+ def test_start_process_disabled (subprocess_manager ): # pylint: disable=redefined-outer-name
86
87
''' Test that disabled processes don't start '''
87
88
manager = subprocess_manager
88
89
manager .config .cparser .value .side_effect = lambda key , ** kwargs : {
@@ -94,7 +95,7 @@ def test_start_process_disabled(subprocess_manager):
94
95
mock_start .assert_not_called ()
95
96
96
97
97
- class MockProcess :
98
+ class MockProcess : # pylint: disable=too-many-instance-attributes
98
99
''' Mock multiprocessing.Process for testing '''
99
100
100
101
def __init__ (self , target = None , name = None , args = None ):
@@ -108,9 +109,11 @@ def __init__(self, target=None, name=None, args=None):
108
109
self ._closed = False
109
110
110
111
def start (self ):
112
+ ''' Mock process start '''
111
113
self ._started = True
112
114
113
115
def join (self , timeout = None ):
116
+ ''' Mock process join '''
114
117
# Simulate quick shutdown for most processes
115
118
if self .name in ['webserver' , 'obsws' ]:
116
119
self ._alive = False
@@ -119,19 +122,22 @@ def join(self, timeout=None):
119
122
self ._alive = False
120
123
121
124
def terminate (self ):
125
+ ''' Mock process terminate '''
122
126
self ._terminated = True
123
127
self ._alive = False
124
128
125
129
def is_alive (self ):
130
+ ''' Mock process is_alive check '''
126
131
return self ._alive
127
132
128
133
def close (self ):
134
+ ''' Mock process close '''
129
135
self ._closed = True
130
136
131
137
132
138
# Ensure test doesn't hang - using manual timing instead of pytest-timeout
133
139
@pytest .mark .performance
134
- def test_parallel_shutdown_performance (subprocess_manager ):
140
+ def test_parallel_shutdown_performance (subprocess_manager ): # pylint: disable=redefined-outer-name
135
141
''' Test that parallel shutdown is faster than sequential '''
136
142
manager = subprocess_manager
137
143
@@ -158,57 +164,57 @@ def test_parallel_shutdown_performance(subprocess_manager):
158
164
assert manager .processes [name ]['process' ] is None
159
165
160
166
161
- def test_stop_process_parallel_graceful_shutdown (subprocess_manager ):
167
+ def test_stop_process_parallel_graceful_shutdown (subprocess_manager ): # pylint: disable=redefined-outer-name
162
168
''' Test graceful shutdown in parallel method '''
163
169
manager = subprocess_manager
164
170
165
171
mock_process = MockProcess (name = 'webserver' )
166
- mock_process ._alive = True # Start alive
172
+ mock_process ._alive = True # Start alive # pylint: disable=protected-access
167
173
manager .processes ['webserver' ]['process' ] = mock_process
168
174
169
175
# Mock join to simulate quick shutdown
170
176
with patch .object (mock_process , 'join' ) as mock_join :
171
177
mock_join .side_effect = lambda timeout : setattr (mock_process , '_alive' , False )
172
178
173
- manager ._stop_process_parallel ('webserver' )
179
+ manager ._stop_process_parallel ('webserver' ) # pylint: disable=protected-access
174
180
175
181
# Should call join with 8 second timeout
176
182
mock_join .assert_called_with (8 )
177
183
assert manager .processes ['webserver' ]['process' ] is None
178
184
179
185
180
- def test_stop_process_parallel_forced_termination (subprocess_manager ):
186
+ def test_stop_process_parallel_forced_termination (subprocess_manager ): # pylint: disable=redefined-outer-name
181
187
''' Test forced termination when graceful shutdown fails '''
182
188
manager = subprocess_manager
183
189
184
190
mock_process = MockProcess (name = 'trackpoll' )
185
191
# Simulate a stuck process that doesn't respond to graceful shutdown
186
- mock_process ._alive = True
192
+ mock_process ._alive = True # pylint: disable=protected-access
187
193
manager .processes ['trackpoll' ]['process' ] = mock_process
188
194
189
195
with patch .object (mock_process , 'join' ) as mock_join , \
190
196
patch .object (mock_process , 'terminate' ) as mock_terminate :
191
197
192
198
# Mock join to keep process alive on first call, then die on second
193
- def mock_join_behavior (timeout ):
199
+ def mock_join_behavior (timeout ): # pylint: disable=unused-argument
194
200
if mock_join .call_count == 1 :
195
201
# First call (graceful) - process stays alive
196
202
pass
197
203
else :
198
204
# Second call (after terminate) - process dies
199
- mock_process ._alive = False
205
+ mock_process ._alive = False # pylint: disable=protected-access
200
206
201
207
mock_join .side_effect = mock_join_behavior
202
208
203
- manager ._stop_process_parallel ('trackpoll' )
209
+ manager ._stop_process_parallel ('trackpoll' ) # pylint: disable=protected-access
204
210
205
211
# Should try graceful first, then terminate
206
212
assert mock_join .call_count == 2
207
213
mock_terminate .assert_called_once ()
208
214
assert manager .processes ['trackpoll' ]['process' ] is None
209
215
210
216
211
- def test_stop_process_parallel_twitchbot_special_handling (subprocess_manager ):
217
+ def test_stop_process_parallel_twitchbot_special_handling (subprocess_manager ): # pylint: disable=redefined-outer-name
212
218
''' Test special handling for twitchbot process '''
213
219
manager = subprocess_manager
214
220
@@ -222,13 +228,13 @@ def test_stop_process_parallel_twitchbot_special_handling(subprocess_manager):
222
228
with patch .object (mock_process , 'join' ) as mock_join :
223
229
mock_join .side_effect = lambda timeout : setattr (mock_process , '_alive' , False )
224
230
225
- manager ._stop_process_parallel ('twitchbot' )
231
+ manager ._stop_process_parallel ('twitchbot' ) # pylint: disable=protected-access
226
232
227
233
# Should call the special stop function with PID
228
234
mock_stop_func .assert_called_once_with (mock_process .pid )
229
235
230
236
231
- def test_stop_process_parallel_error_handling (subprocess_manager ):
237
+ def test_stop_process_parallel_error_handling (subprocess_manager ): # pylint: disable=redefined-outer-name
232
238
''' Test error handling during parallel shutdown '''
233
239
manager = subprocess_manager
234
240
@@ -242,13 +248,13 @@ def test_stop_process_parallel_error_handling(subprocess_manager):
242
248
mock_join .side_effect = lambda timeout : setattr (mock_process , '_alive' , False )
243
249
244
250
# Should not raise exception, just log and continue
245
- manager ._stop_process_parallel ('webserver' )
251
+ manager ._stop_process_parallel ('webserver' ) # pylint: disable=protected-access
246
252
247
253
# Process should still be cleaned up
248
254
assert manager .processes ['webserver' ]['process' ] is None
249
255
250
256
251
- def test_stop_all_processes_with_timeout (subprocess_manager ):
257
+ def test_stop_all_processes_with_timeout (subprocess_manager ): # pylint: disable=redefined-outer-name
252
258
''' Test that stop_all_processes handles timeouts gracefully '''
253
259
manager = subprocess_manager
254
260
@@ -270,7 +276,7 @@ def test_stop_all_processes_with_timeout(subprocess_manager):
270
276
assert manager .processes [name ]['process' ] is None
271
277
272
278
273
- def test_stop_all_processes_with_no_running_processes (subprocess_manager ):
279
+ def test_stop_all_processes_with_no_running_processes (subprocess_manager ): # pylint: disable=redefined-outer-name
274
280
'''Test that stop_all_processes does not raise errors when all process slots are None'''
275
281
manager = subprocess_manager
276
282
@@ -286,7 +292,7 @@ def test_stop_all_processes_with_no_running_processes(subprocess_manager):
286
292
assert manager .processes [name ]['process' ] is None
287
293
288
294
289
- def test_stop_all_processes_signals_all_first (subprocess_manager ):
295
+ def test_stop_all_processes_signals_all_first (subprocess_manager ): # pylint: disable=redefined-outer-name
290
296
''' Test that stop_all_processes signals all processes before joining '''
291
297
manager = subprocess_manager
292
298
@@ -299,7 +305,10 @@ def test_stop_all_processes_signals_all_first(subprocess_manager):
299
305
300
306
# Track when stopevent.set() is called
301
307
original_set = manager .processes [name ]['stopevent' ].set
302
- manager .processes [name ]['stopevent' ].set = lambda n = name : signal_order .append (n ) or original_set ()
308
+ # Create closure to capture name properly
309
+ def make_set_wrapper (process_name , orig_set ):
310
+ return lambda : signal_order .append (process_name ) or orig_set ()
311
+ manager .processes [name ]['stopevent' ].set = make_set_wrapper (name , original_set )
303
312
304
313
manager .stop_all_processes ()
305
314
@@ -310,7 +319,7 @@ def test_stop_all_processes_signals_all_first(subprocess_manager):
310
319
assert expected in signal_order
311
320
312
321
313
- def test_legacy_methods (subprocess_manager ):
322
+ def test_legacy_methods (subprocess_manager ): # pylint: disable=redefined-outer-name
314
323
''' Test that legacy methods still work '''
315
324
manager = subprocess_manager
316
325
@@ -343,7 +352,7 @@ def test_legacy_methods(subprocess_manager):
343
352
344
353
345
354
@pytest .mark .skipif (sys .platform != "win32" , reason = "Windows-specific timeout test" )
346
- def test_windows_process_termination_timeout (subprocess_manager ):
355
+ def test_windows_process_termination_timeout (subprocess_manager ): # pylint: disable=redefined-outer-name
347
356
''' Test that Windows gets longer termination timeouts '''
348
357
manager = subprocess_manager
349
358
@@ -359,7 +368,7 @@ def join(self, timeout=None):
359
368
if self .join_call_count == 1 :
360
369
return # Process stays alive
361
370
# Second call (after terminate) - now it dies with Windows timeout
362
- elif self .join_call_count == 2 and timeout and timeout >= 7 :
371
+ if self .join_call_count == 2 and timeout and timeout >= 7 :
363
372
self ._alive = False
364
373
365
374
mock_process = WindowsSlowProcess (name = 'trackpoll' )
@@ -368,7 +377,7 @@ def join(self, timeout=None):
368
377
with patch .object (mock_process , 'join' , wraps = mock_process .join ) as mock_join , \
369
378
patch .object (mock_process , 'terminate' ) as mock_terminate :
370
379
371
- manager ._stop_process_parallel ('trackpoll' )
380
+ manager ._stop_process_parallel ('trackpoll' ) # pylint: disable=protected-access
372
381
373
382
# Should call join twice: first for graceful (8s), then for force (7s)
374
383
assert mock_join .call_count == 2
@@ -378,7 +387,7 @@ def join(self, timeout=None):
378
387
mock_join .assert_any_call (7 )
379
388
380
389
381
- def test_cross_platform_timeout_behavior (subprocess_manager ):
390
+ def test_cross_platform_timeout_behavior (subprocess_manager ): # pylint: disable=redefined-outer-name
382
391
''' Test timeout behavior across platforms '''
383
392
manager = subprocess_manager
384
393
@@ -388,10 +397,10 @@ def test_cross_platform_timeout_behavior(subprocess_manager):
388
397
with patch .object (mock_process , 'join' ) as mock_join :
389
398
mock_join .side_effect = lambda timeout : setattr (mock_process , '_alive' , False )
390
399
391
- manager ._stop_process_parallel ('webserver' )
400
+ manager ._stop_process_parallel ('webserver' ) # pylint: disable=protected-access
392
401
393
402
# Should use 8 second graceful timeout on all platforms
394
403
mock_join .assert_any_call (8 )
395
404
396
405
# If termination was needed, should use 7 second timeout
397
- # (increased from 5 to accommodate Windows)
406
+ # (increased from 5 to accommodate Windows)
0 commit comments