GSK-1895 New worker pool#1478
Conversation
bd58989 to
a3b8bde
Compare
andreybavt
left a comment
There was a problem hiding this comment.
A couple of general comments:
- the queue names are prefixed with
_but being referenced from outside of executor, for example:executor._pending_tasks_queue. I think it's ok if they become public, at least we're using them this way anyway - There are still untested corner cases, like "Future has been cancelled already". It'll be great to increase code coverage even more considering that this is a core part of the ML Worker
| while not self._running_tasks_queue.empty(): | ||
| try: | ||
| self._running_tasks_queue.get_nowait() | ||
| except BaseException: |
There was a problem hiding this comment.
should we log them at least?
There was a problem hiding this comment.
Yeah, probably. Plus I should reduce the exeception scope
| return exit_codes | ||
|
|
||
|
|
||
| def _results_thread( |
There was a problem hiding this comment.
From what I understand you're waking up frequently to check if the whole pool is in one of the FINAL_STATES or not. Wouldn't it be better to call executor._tasks_results.get(block=True, timeout=2) with a larger timeout and after catching Empty exception check the the pool is in one of the FINAL_STATES?
In this case there'll be less busy waiting.
There was a problem hiding this comment.
Yeah I guess we could. I'm not a big fan of using try except as an if, but it may be better in this case.
| future.set_result(result.result) | ||
| else: | ||
| # TODO(Bazire): improve to get Traceback | ||
| future.set_exception(RuntimeError(result.exception)) |
There was a problem hiding this comment.
why can't it by directly future.set_exception(result.exception) ?
There was a problem hiding this comment.
Exception is a string in this case (the exception + traceback), because it needs to be pickable, and exception are linked to many objects + have circular references
| try: | ||
| del executor._futures_mapping[result.id] | ||
| except BaseException: | ||
| pass |
There was a problem hiding this comment.
same as above, we need to know if something goes not as planned
There was a problem hiding this comment.
Yup, I will add some warning logs
| future = executor._futures_mapping.get(task.id) | ||
| if future is not None and future.set_running_or_notify_cancel(): | ||
| executor._running_tasks_queue.put(task) | ||
| elif future is not None: |
There was a problem hiding this comment.
you're checking for future is not None twice, I think it'll be simple to have this check first and then a second if to check if the future is canceled or not under it
| # Mapping of the running tasks and worker pids | ||
| self._running_process: Dict[str, str] = self._manager.dict() | ||
| # Mapping of the running tasks and worker pids | ||
| self._with_timeout_tasks: List[Tuple[str, float]] = [] |
There was a problem hiding this comment.
nitpick: can we use a dataclass here instead of a Tuple? It'll be more maintainable sice we won't have to remember what the parameters were
There was a problem hiding this comment.
Actually, I created the dataclass and forgot to use it 😆 😿
Done !
|
also let's get Sonar code smells fixed too (at least the ones not related to BaseException) |
| self._pending_tasks_queue: SimpleQueue[GiskardTask] = self._mp_context.SimpleQueue() | ||
| # Queue with tasks to run | ||
| # As in ProcessPool, add one more to avoid idling process | ||
| self._running_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue(maxsize=self._nb_workers + 1) |
There was a problem hiding this comment.
the way you use it, it should be rather Queue[Optional[GiskardTask]]
df6558d to
9e4784a
Compare
Hartorn
left a comment
There was a problem hiding this comment.
I took in account all comment.
Also, I improved the threads, especially the killer one.
I added a quich health_check too
| # Mapping of the running tasks and worker pids | ||
| self._running_process: Dict[str, str] = self._manager.dict() | ||
| # Mapping of the running tasks and worker pids | ||
| self._with_timeout_tasks: List[Tuple[str, float]] = [] |
There was a problem hiding this comment.
Actually, I created the dataclass and forgot to use it 😆 😿
Done !
| self._pending_tasks_queue: SimpleQueue[GiskardTask] = self._mp_context.SimpleQueue() | ||
| # Queue with tasks to run | ||
| # As in ProcessPool, add one more to avoid idling process | ||
| self._running_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue(maxsize=self._nb_workers + 1) |
| while not self._running_tasks_queue.empty(): | ||
| try: | ||
| self._running_tasks_queue.get_nowait() | ||
| except BaseException: |
| return exit_codes | ||
|
|
||
|
|
||
| def _results_thread( |
| try: | ||
| del executor._futures_mapping[result.id] | ||
| except BaseException: | ||
| pass |
| try: | ||
| del executor._futures_mapping[result.id] | ||
| except BaseException: | ||
| pass |
2cf31fd to
0976146
Compare
227cd39 to
ed598a6
Compare
ed598a6 to
62c72c4
Compare
3b2db1c to
c53635d
Compare
f7b8730 to
331b11e
Compare
# Conflicts: # .github/workflows/build-python.yml # pdm.lock # pyproject.toml
|
Kudos, SonarCloud Quality Gate passed! |








Description
Re-implement a worker pool, with possibility for timeout
Also, had capacity to capture all loging for executed task
I removed python deps and related code, because it's was a bother for me under windows for testing.
Related Issue
Type of Change
Checklist
CODE_OF_CONDUCT.mddocument.CONTRIBUTING.mdguide.make codestyle.