diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2012-04-27 23:51:03 +0200 |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2012-04-27 23:51:03 +0200 |
commit | 92ff4e196bfd5361f231ab8629025d28af1decab (patch) | |
tree | 238c48b79e5f733e80ce37ac4d78c90a4910e47a /Lib/multiprocessing/reduction.py | |
parent | d0880d57b053179a8dd91f2b6fbcb5b5ddf56a1d (diff) | |
download | cpython-git-92ff4e196bfd5361f231ab8629025d28af1decab.tar.gz |
Issue #14666: stop multiprocessing's resource-sharing thread after the tests are done.
Also, block delivery of signals to that thread. Patch by Richard Oudkerk.
This will hopefully fix sporadic freezes on the FreeBSD 9.0 buildbot.
Diffstat (limited to 'Lib/multiprocessing/reduction.py')
-rw-r--r-- | Lib/multiprocessing/reduction.py | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index ce38fe367e..cef445b4d8 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -40,6 +40,7 @@ import sys import socket import threading import struct +import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug @@ -209,6 +210,7 @@ class ResourceSharer(object): self._lock = threading.Lock() self._listener = None self._address = None + self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): @@ -227,6 +229,24 @@ class ResourceSharer(object): c.send((key, os.getpid())) return c + def stop(self, timeout=None): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(timeout) + if self._thread.is_alive(): + sub_warn('ResourceSharer thread did not stop when asked') + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + def _afterfork(self): for key, (send, close) in self._cache.items(): close() @@ -239,6 +259,7 @@ class ResourceSharer(object): self._listener.close() self._listener = None self._address = None + self._thread = None def _start(self): from .connection import Listener @@ -249,12 +270,18 @@ class ResourceSharer(object): t = threading.Thread(target=self._serve) t.daemon = True t.start() + self._thread = t def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() - key, destination_pid = conn.recv() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() |