diff options
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 97 |
1 files changed, 64 insertions, 33 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 44f8504cca..a899d5fca8 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -66,14 +66,17 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_live_threads = weakref.WeakSet() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread in _live_threads: - thread.join() + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -116,11 +119,15 @@ def _process_worker(call_queue, result_queue, shutdown): """ while True: try: - call_item = call_queue.get(block=True, timeout=0.1) + call_item = call_queue.get(block=True) except queue.Empty: if shutdown.is_set(): return else: + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: @@ -195,40 +202,56 @@ def _queue_manangement_worker(executor_reference, process workers that they should exit when their work queue is empty. """ + nb_shutdown_processes = 0 + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + nonlocal nb_shutdown_processes + call_queue.put(None) + nb_shutdown_processes += 1 while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) try: - result_item = result_queue.get(block=True, timeout=0.1) + result_item = result_queue.get(block=True) except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor + pass else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) + if result_item is not None: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] + + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + continue + # If we come here, we either got a timeout or were explicitly woken up. + # In either case, check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_process_event.set() + + while nb_shutdown_processes < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + return else: - work_item.future.set_result(result_item.result) + # Start shutting down by telling a process it can exit. + shutdown_one_process() + del executor _system_limits_checked = False _system_limited = None @@ -289,10 +312,14 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items = {} def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( target=_queue_manangement_worker, - args=(weakref.ref(self), + args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, self._work_ids, @@ -301,7 +328,7 @@ class ProcessPoolExecutor(_base.Executor): self._shutdown_process_event)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _live_threads.add(self._queue_management_thread) + _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -324,6 +351,8 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) self._start_queue_management_thread() self._adjust_process_count() @@ -333,8 +362,10 @@ class ProcessPoolExecutor(_base.Executor): def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True - if wait: - if self._queue_management_thread: + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: self._queue_management_thread.join() # To reduce the risk of openning too many files, remove references to # objects that use file descriptors. |