summaryrefslogtreecommitdiff
path: root/Lib/concurrent/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r--Lib/concurrent/futures/process.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 7f31ec2263..04238a7ace 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,7 +50,8 @@ import os
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady, Full
+from multiprocessing.queues import SimpleQueue, Full
+from multiprocessing.connection import wait
import threading
import weakref
@@ -212,6 +213,8 @@ def _queue_management_worker(executor_reference,
for p in processes.values():
p.join()
+ reader = result_queue._reader
+
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
@@ -219,9 +222,10 @@ def _queue_management_worker(executor_reference,
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
- try:
- result_item = result_queue.get(sentinels=sentinels)
- except SentinelReady:
+ ready = wait([reader] + sentinels)
+ if reader in ready:
+ result_item = reader.recv()
+ else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None: