summaryrefslogtreecommitdiff
path: root/Lib/concurrent/futures/thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/thread.py')
-rw-r--r--Lib/concurrent/futures/thread.py45
1 files changed, 30 insertions, 15 deletions
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 299b94a77a..93b495f55a 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -25,14 +25,18 @@ import weakref
# workers to exit when their work queues are empty and then waits until the
# threads finish.
-_live_threads = weakref.WeakSet()
+_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
- for thread in _live_threads:
- thread.join()
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
+
atexit.register(_python_exit)
class _WorkItem(object):
@@ -57,18 +61,23 @@ def _worker(executor_reference, work_queue):
try:
while True:
try:
- work_item = work_queue.get(block=True, timeout=0.1)
+ work_item = work_queue.get(block=True)
except queue.Empty:
- executor = executor_reference()
- # Exit if:
- # - The interpreter is shutting down OR
- # - The executor that owns the worker has been collected OR
- # - The executor that owns the worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown:
- return
- del executor
+ pass
else:
- work_item.run()
+ if work_item is not None:
+ work_item.run()
+ continue
+ executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns the worker has been collected OR
+ # - The executor that owns the worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown:
+ # Notice other workers
+ work_queue.put(None)
+ return
+ del executor
except BaseException as e:
_base.LOGGER.critical('Exception in worker', exc_info=True)
@@ -100,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the worker threads.
+ def weakref_cb(_, q=self._work_queue):
+ q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
- args=(weakref.ref(self), self._work_queue))
+ args=(weakref.ref(self, weakref_cb),
+ self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
- _live_threads.add(t)
+ _threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
+ self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()