diff options
Diffstat (limited to 'Lib/concurrent/futures/thread.py')
-rw-r--r-- | Lib/concurrent/futures/thread.py | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 299b94a77a..93b495f55a 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -25,14 +25,18 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads finish. -_live_threads = weakref.WeakSet() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread in _live_threads: - thread.join() + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + atexit.register(_python_exit) class _WorkItem(object): @@ -57,18 +61,23 @@ def _worker(executor_reference, work_queue): try: while True: try: - work_item = work_queue.get(block=True, timeout=0.1) + work_item = work_queue.get(block=True) except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor + pass else: - work_item.run() + if work_item is not None: + work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor except BaseException as e: _base.LOGGER.critical('Exception in worker', exc_info=True) @@ -100,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) + args=(weakref.ref(self, weakref_cb), + self._work_queue)) t.daemon = True t.start() self._threads.add(t) - _live_threads.add(t) + _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True + self._work_queue.put(None) if wait: for t in self._threads: t.join() |