Skip to content

Commit 2cf31fd

Browse files
committed
Making process manipulation safer
1 parent 9e4784a commit 2cf31fd

1 file changed

Lines changed: 37 additions & 13 deletions

File tree

giskard/utils/worker_pool.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,40 @@ def _generate_task_id():
2424
return str(uuid4())
2525

2626

27+
def _safe_is_alive(p: Process) -> bool:
28+
try:
29+
return p.is_alive()
30+
except ValueError:
31+
return False
32+
33+
34+
def _safe_exit_code(p: Process) -> int:
35+
try:
36+
return p.exitcode
37+
except ValueError:
38+
return -1
39+
40+
2741
def _wait_process_stop(p_list: List[Process], timeout: float = 1):
2842
end_time = time.monotonic() + timeout
29-
while any([p.is_alive() for p in p_list]) and time.monotonic() < end_time:
43+
while any([_safe_is_alive(p) for p in p_list]) and time.monotonic() < end_time:
3044
sleep(0.1)
3145

3246

3347
def _stop_processes(p_list: List[Process], timeout: float = 1) -> List[Optional[int]]:
3448
# Check if process is alive.
3549
for p in p_list:
36-
if p.is_alive():
50+
if _safe_is_alive(p):
3751
# Try to terminate with SIGTERM first
3852
p.terminate()
3953
_wait_process_stop(p_list, timeout=timeout)
4054

4155
for p in p_list:
42-
if p.is_alive():
56+
if _safe_is_alive(p):
4357
# If still alive, kill the processes
4458
p.kill()
4559
_wait_process_stop(p_list, timeout=2)
46-
exit_codes = [p.exitcode for p in p_list]
60+
exit_codes = [_safe_exit_code(p) for p in p_list]
4761
# Free all resources
4862
for p in p_list:
4963
p.close()
@@ -163,9 +177,12 @@ def __init__(self, nb_workers: Optional[int] = None, name: Optional[str] = None)
163177
LOGGER.info("WorkerPoolExecutor is started")
164178

165179
def health_check(self):
166-
if any([not p.is_alive() for p in self._processes.values()]):
180+
if self._state in FINAL_STATES:
181+
return
182+
if any([not _safe_is_alive(p) for p in self._processes.values()]):
167183
LOGGER.warning("At least one process died for an unknown reason, marking pool as broken")
168184
self._state = PoolState.BROKEN
185+
self.shutdown(wait=False, timeout=1)
169186

170187
def _spawn_worker(self):
171188
# Daemon means process are linked to main one, and will be stopped if current process is stopped
@@ -203,12 +220,13 @@ def schedule(
203220
self._with_timeout_tasks.append(TimeoutData(task.id, time.monotonic() + timeout))
204221
return res
205222

206-
def shutdown(self, wait=True, timeout: float = 5):
207-
if self._state in FINAL_STATES:
223+
def shutdown(self, wait=True, timeout: float = 5, force=False):
224+
if self._state in FINAL_STATES and not force:
208225
return
209226
# Changing state, so that thread will stop
210227
# Killer thread will also do cleanup
211-
self._state = PoolState.STOPPING
228+
if not force:
229+
self._state = PoolState.STOPPING
212230
# Cancelling all futures we have
213231
for future in self._futures_mapping.values():
214232
if future.cancel() and not future.done():
@@ -226,13 +244,17 @@ def shutdown(self, wait=True, timeout: float = 5):
226244
LOGGER.warning("Queue was empty, skipping")
227245
LOGGER.exception(e)
228246
# Try to nicely stop the worker, by adding None into the running tasks
229-
for _ in range(self._nb_workers):
230-
self._running_tasks_queue.put(None, timeout=1)
247+
try:
248+
for _ in range(self._nb_workers):
249+
self._running_tasks_queue.put(None, timeout=1)
250+
except OSError as e:
251+
# This happens if queues is closed
252+
LOGGER.warning("Running task queue is already closed")
253+
LOGGER.exception(e)
231254
# Wait for process to stop by themselves
232255
p_list = list(self._processes.values())
233256
if wait:
234257
_wait_process_stop(p_list, timeout=timeout)
235-
236258
# Clean all the queues
237259
for queue in [self._pending_tasks_queue, self._tasks_results, self._running_tasks_queue]:
238260
# In python 3.8, Simple queue seems to not have close method
@@ -318,7 +340,8 @@ def _killer_thread(
318340
while len(executor._with_timeout_tasks) == 0 and executor._state not in FINAL_STATES:
319341
# No need to be too active
320342
sleep(1)
321-
executor.health_check()
343+
if executor._state not in FINAL_STATES:
344+
executor.health_check()
322345
if executor._state in FINAL_STATES:
323346
return
324347

@@ -339,7 +362,8 @@ def _killer_thread(
339362
if pid is not None:
340363
p = executor._processes.pop(pid)
341364
_stop_processes([p])
342-
executor._spawn_worker()
365+
if executor._state not in FINAL_STATES:
366+
executor._spawn_worker()
343367
except BaseException as e:
344368
# This is probably an OSError, but we want to be extra safe
345369
LOGGER.warning("Unexpected error when killing a timed out process, pool is broken")

0 commit comments

Comments
 (0)