diff options
Diffstat (limited to 'Lib/multiprocessing')
| -rw-r--r-- | Lib/multiprocessing/connection.py | 8 | ||||
| -rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 5 | ||||
| -rw-r--r-- | Lib/multiprocessing/forkserver.py | 12 | ||||
| -rw-r--r-- | Lib/multiprocessing/heap.py | 15 | ||||
| -rw-r--r-- | Lib/multiprocessing/managers.py | 54 | ||||
| -rw-r--r-- | Lib/multiprocessing/pool.py | 28 | ||||
| -rw-r--r-- | Lib/multiprocessing/popen_fork.py | 2 | ||||
| -rw-r--r-- | Lib/multiprocessing/queues.py | 6 | ||||
| -rw-r--r-- | Lib/multiprocessing/reduction.py | 5 | ||||
| -rw-r--r-- | Lib/multiprocessing/resource_sharer.py | 2 | ||||
| -rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 3 | ||||
| -rw-r--r-- | Lib/multiprocessing/spawn.py | 2 | ||||
| -rw-r--r-- | Lib/multiprocessing/synchronize.py | 7 | ||||
| -rw-r--r-- | Lib/multiprocessing/util.py | 9 | 
14 files changed, 119 insertions, 39 deletions
| diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index ba9b17cee1..7a621a55f4 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -720,7 +720,9 @@ FAILURE = b'#FAILURE#'  def deliver_challenge(connection, authkey):      import hmac -    assert isinstance(authkey, bytes) +    if not isinstance(authkey, bytes): +        raise ValueError( +            "Authkey must be bytes, not {0!s}".format(type(authkey)))      message = os.urandom(MESSAGE_LENGTH)      connection.send_bytes(CHALLENGE + message)      digest = hmac.new(authkey, message, 'md5').digest() @@ -733,7 +735,9 @@ def deliver_challenge(connection, authkey):  def answer_challenge(connection, authkey):      import hmac -    assert isinstance(authkey, bytes) +    if not isinstance(authkey, bytes): +        raise ValueError( +            "Authkey must be bytes, not {0!s}".format(type(authkey)))      message = connection.recv_bytes(256)         # reject large message      assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message      message = message[len(CHALLENGE):] diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index cbb7f4909c..403f5e5198 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -41,7 +41,10 @@ class DummyProcess(threading.Thread):          self._parent = current_process()      def start(self): -        assert self._parent is current_process() +        if self._parent is not current_process(): +            raise RuntimeError( +                "Parent is {0!r} but current_process is {1!r}".format( +                    self._parent, current_process()))          self._start_called = True          if hasattr(self._parent, '_children'):              self._parent._children[self] = None diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 69b842aa93..7a952e2ba6 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -189,7 +189,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):                  if alive_r in rfds:                      # EOF because no more client processes left -                    assert os.read(alive_r, 1) == b'' +                    assert os.read(alive_r, 1) == b'', "Not at EOF?"                      raise SystemExit                  if sig_r in rfds: @@ -208,7 +208,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):                              if os.WIFSIGNALED(sts):                                  returncode = -os.WTERMSIG(sts)                              else: -                                assert os.WIFEXITED(sts) +                                if not os.WIFEXITED(sts): +                                    raise AssertionError( +                                        "Child {0:n} status is {1:n}".format( +                                            pid,sts))                                  returncode = os.WEXITSTATUS(sts)                              # Send exit code to client process                              try: @@ -227,7 +230,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):                      with listener.accept()[0] as s:                          # Receive fds from client                          fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) -                        assert len(fds) <= MAXFDS_TO_SEND +                        if len(fds) > MAXFDS_TO_SEND: +                            raise RuntimeError( +                                "Too many ({0:n}) fds to send".format( +                                    len(fds)))                          child_r, child_w, *fds = fds                          s.close()                          pid = os.fork() diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index ee3ed551d0..566173a1b0 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -211,7 +211,10 @@ class Heap(object):          # synchronously sometimes later from malloc() or free(), by calling          # _free_pending_blocks() (appending and retrieving from a list is not          # strictly thread-safe but under cPython it's atomic thanks to the GIL). -        assert os.getpid() == self._lastpid +        if os.getpid() != self._lastpid: +            raise ValueError( +                "My pid ({0:n}) is not last pid {1:n}".format( +                    os.getpid(),self._lastpid))          if not self._lock.acquire(False):              # can't acquire the lock right now, add the block to the list of              # pending blocks to free @@ -227,7 +230,10 @@ class Heap(object):      def malloc(self, size):          # return a block of right size (possibly rounded up) -        assert 0 <= size < sys.maxsize +        if size < 0: +            raise ValueError("Size {0:n} out of range".format(size)) +        if sys.maxsize <= size: +            raise OverflowError("Size {0:n} too large".format(size))          if os.getpid() != self._lastpid:              self.__init__()                     # reinitialize after fork          with self._lock: @@ -250,7 +256,10 @@ class BufferWrapper(object):      _heap = Heap()      def __init__(self, size): -        assert 0 <= size < sys.maxsize +        if size < 0: +            raise ValueError("Size {0:n} out of range".format(size)) +        if sys.maxsize <= size: +            raise OverflowError("Size {0:n} too large".format(size))          block = BufferWrapper._heap.malloc(size)          self._state = (block, size)          util.Finalize(self, BufferWrapper._heap.free, args=(block,)) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index c6722771b0..04df26bac6 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -23,7 +23,7 @@ from time import time as _time  from traceback import format_exc  from . import connection -from .context import reduction, get_spawning_popen +from .context import reduction, get_spawning_popen, ProcessError  from . import pool  from . import process  from . import util @@ -133,7 +133,10 @@ class Server(object):                'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']      def __init__(self, registry, address, authkey, serializer): -        assert isinstance(authkey, bytes) +        if not isinstance(authkey, bytes): +            raise TypeError( +                "Authkey {0!r} is type {1!s}, not bytes".format( +                    authkey, type(authkey)))          self.registry = registry          self.authkey = process.AuthenticationString(authkey)          Listener, Client = listener_client[serializer] @@ -163,7 +166,7 @@ class Server(object):              except (KeyboardInterrupt, SystemExit):                  pass          finally: -            if sys.stdout != sys.__stdout__: +            if sys.stdout != sys.__stdout__: # what about stderr?                  util.debug('resetting stdout, stderr')                  sys.stdout = sys.__stdout__                  sys.stderr = sys.__stderr__ @@ -316,6 +319,7 @@ class Server(object):          '''          Return some info --- useful to spot problems with refcounting          ''' +        # Perhaps include debug info about 'c'?          with self.mutex:              result = []              keys = list(self.id_to_refcount.keys()) @@ -356,7 +360,9 @@ class Server(object):                        self.registry[typeid]              if callable is None: -                assert len(args) == 1 and not kwds +                if kwds or (len(args) != 1): +                    raise ValueError( +                        "Without callable, must have one non-keyword argument")                  obj = args[0]              else:                  obj = callable(*args, **kwds) @@ -364,7 +370,10 @@ class Server(object):              if exposed is None:                  exposed = public_methods(obj)              if method_to_typeid is not None: -                assert type(method_to_typeid) is dict +                if not isinstance(method_to_typeid, dict): +                    raise TypeError( +                        "Method_to_typeid {0!r}: type {1!s}, not dict".format( +                            method_to_typeid, type(method_to_typeid)))                  exposed = list(exposed) + list(method_to_typeid)              ident = '%x' % id(obj)  # convert to string because xmlrpclib @@ -417,7 +426,11 @@ class Server(object):              return          with self.mutex: -            assert self.id_to_refcount[ident] >= 1 +            if self.id_to_refcount[ident] <= 0: +                raise AssertionError( +                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( +                        ident, self.id_to_obj[ident], +                        self.id_to_refcount[ident]))              self.id_to_refcount[ident] -= 1              if self.id_to_refcount[ident] == 0:                  del self.id_to_refcount[ident] @@ -480,7 +493,14 @@ class BaseManager(object):          '''          Return server object with serve_forever() method and address attribute          ''' -        assert self._state.value == State.INITIAL +        if self._state.value != State.INITIAL: +            if self._state.value == State.STARTED: +                raise ProcessError("Already started server") +            elif self._state.value == State.SHUTDOWN: +                raise ProcessError("Manager has shut down") +            else: +                raise ProcessError( +                    "Unknown state {!r}".format(self._state.value))          return Server(self._registry, self._address,                        self._authkey, self._serializer) @@ -497,7 +517,14 @@ class BaseManager(object):          '''          Spawn a server process for this manager object          ''' -        assert self._state.value == State.INITIAL +        if self._state.value != State.INITIAL: +            if self._state.value == State.STARTED: +                raise ProcessError("Already started server") +            elif self._state.value == State.SHUTDOWN: +                raise ProcessError("Manager has shut down") +            else: +                raise ProcessError( +                    "Unknown state {!r}".format(self._state.value))          if initializer is not None and not callable(initializer):              raise TypeError('initializer must be a callable') @@ -593,7 +620,14 @@ class BaseManager(object):      def __enter__(self):          if self._state.value == State.INITIAL:              self.start() -        assert self._state.value == State.STARTED +        if self._state.value != State.STARTED: +            if self._state.value == State.INITIAL: +                raise ProcessError("Unable to start server") +            elif self._state.value == State.SHUTDOWN: +                raise ProcessError("Manager has shut down") +            else: +                raise ProcessError( +                    "Unknown state {!r}".format(self._state.value))          return self      def __exit__(self, exc_type, exc_val, exc_tb): @@ -653,7 +687,7 @@ class BaseManager(object):                             getattr(proxytype, '_method_to_typeid_', None)          if method_to_typeid: -            for key, value in list(method_to_typeid.items()): +            for key, value in list(method_to_typeid.items()): # isinstance?                  assert type(key) is str, '%r is not a string' % key                  assert type(value) is str, '%r is not a string' % value diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index c2364ab186..e457f0a576 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -92,7 +92,9 @@ class MaybeEncodingError(Exception):  def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,             wrap_exception=False): -    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) +    if (maxtasks is not None) and not (isinstance(maxtasks, int) +                                       and maxtasks >= 1): +        raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))      put = outqueue.put      get = inqueue.get      if hasattr(inqueue, '_writer'): @@ -254,8 +256,8 @@ class Pool(object):      def apply(self, func, args=(), kwds={}):          '''          Equivalent of `func(*args, **kwds)`. +        Pool must be running.          ''' -        assert self._state == RUN          return self.apply_async(func, args, kwds).get()      def map(self, func, iterable, chunksize=None): @@ -307,6 +309,10 @@ class Pool(object):                  ))              return result          else: +            if chunksize < 1: +                raise ValueError( +                    "Chunksize must be 1+, not {0:n}".format( +                        chunksize))              assert chunksize > 1              task_batches = Pool._get_tasks(func, iterable, chunksize)              result = IMapIterator(self._cache) @@ -334,7 +340,9 @@ class Pool(object):                  ))              return result          else: -            assert chunksize > 1 +            if chunksize < 1: +                raise ValueError( +                    "Chunksize must be 1+, not {0!r}".format(chunksize))              task_batches = Pool._get_tasks(func, iterable, chunksize)              result = IMapUnorderedIterator(self._cache)              self._taskqueue.put( @@ -466,7 +474,7 @@ class Pool(object):                  return              if thread._state: -                assert thread._state == TERMINATE +                assert thread._state == TERMINATE, "Thread not in TERMINATE"                  util.debug('result handler found thread._state=TERMINATE')                  break @@ -542,7 +550,10 @@ class Pool(object):      def join(self):          util.debug('joining pool') -        assert self._state in (CLOSE, TERMINATE) +        if self._state == RUN: +            raise ValueError("Pool is still running") +        elif self._state not in (CLOSE, TERMINATE): +            raise ValueError("In unknown state")          self._worker_handler.join()          self._task_handler.join()          self._result_handler.join() @@ -570,7 +581,9 @@ class Pool(object):          util.debug('helping task handler/workers to finish')          cls._help_stuff_finish(inqueue, task_handler, len(pool)) -        assert result_handler.is_alive() or len(cache) == 0 +        if (not result_handler.is_alive()) and (len(cache) != 0): +            raise AssertionError( +                "Cannot have cache with result_hander not alive")          result_handler._state = TERMINATE          outqueue.put(None)                  # sentinel @@ -628,7 +641,8 @@ class ApplyResult(object):          return self._event.is_set()      def successful(self): -        assert self.ready() +        if not self.ready(): +            raise ValueError("{0!r} not ready".format(self))          return self._success      def wait(self, timeout=None): diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 44ce9a9e79..cbdbfa79cf 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -35,7 +35,7 @@ class Popen(object):                  if os.WIFSIGNALED(sts):                      self.returncode = -os.WTERMSIG(sts)                  else: -                    assert os.WIFEXITED(sts) +                    assert os.WIFEXITED(sts), "Status is {:n}".format(sts)                      self.returncode = os.WEXITSTATUS(sts)          return self.returncode diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 513807cafe..328efbd95f 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -78,7 +78,7 @@ class Queue(object):          self._poll = self._reader.poll      def put(self, obj, block=True, timeout=None): -        assert not self._closed +        assert not self._closed, "Queue {0!r} has been closed".format(self)          if not self._sem.acquire(block, timeout):              raise Full @@ -140,7 +140,7 @@ class Queue(object):      def join_thread(self):          debug('Queue.join_thread()') -        assert self._closed +        assert self._closed, "Queue {0!r} not closed".format(self)          if self._jointhread:              self._jointhread() @@ -281,7 +281,7 @@ class JoinableQueue(Queue):          self._cond, self._unfinished_tasks = state[-2:]      def put(self, obj, block=True, timeout=None): -        assert not self._closed +        assert not self._closed, "Queue {0!r} is closed".format(self)          if not self._sem.acquire(block, timeout):              raise Full diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 7f65947379..deca19ccad 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -165,7 +165,10 @@ else:                  if len(cmsg_data) % a.itemsize != 0:                      raise ValueError                  a.frombytes(cmsg_data) -                assert len(a) % 256 == msg[0] +                if len(a) % 256 != msg[0]: +                    raise AssertionError( +                        "Len is {0:n} but msg[0] is {1!r}".format( +                            len(a), msg[0]))                  return list(a)          except (ValueError, IndexError):              pass diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py index e44a728fa9..6d99da102f 100644 --- a/Lib/multiprocessing/resource_sharer.py +++ b/Lib/multiprocessing/resource_sharer.py @@ -125,7 +125,7 @@ class _ResourceSharer(object):      def _start(self):          from .connection import Listener -        assert self._listener is None +        assert self._listener is None, "Already have Listener"          util.debug('starting listener and thread for sending handles')          self._listener = Listener(authkey=process.current_process().authkey)          self._address = self._listener.address diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index de7738eeee..d5f259c246 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -80,7 +80,8 @@ class SemaphoreTracker(object):              # bytes are atomic, and that PIPE_BUF >= 512              raise ValueError('name too long')          nbytes = os.write(self._fd, msg) -        assert nbytes == len(msg) +        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( +            nbytes, len(msg))  _semaphore_tracker = SemaphoreTracker() diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 4aba372e48..1f4f3f496f 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -93,7 +93,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):      '''      Run code specified by data received over pipe      ''' -    assert is_forking(sys.argv) +    assert is_forking(sys.argv), "Not forking"      if sys.platform == 'win32':          import msvcrt          new_handle = reduction.steal_handle(parent_pid, pipe_handle) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 0590ed68f5..038f73f6b7 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -270,13 +270,16 @@ class Condition(object):      def notify(self, n=1):          assert self._lock._semlock._is_mine(), 'lock is not owned' -        assert not self._wait_semaphore.acquire(False) +        assert not self._wait_semaphore.acquire( +            False), ('notify: Should not have been able to acquire' +                     + '_wait_semaphore')          # to take account of timeouts since last notify*() we subtract          # woken_count from sleeping_count and rezero woken_count          while self._woken_count.acquire(False):              res = self._sleeping_count.acquire(False) -            assert res +            assert res, ('notify: Bug in sleeping_count.acquire' +                         + '- res should not be False')          sleepers = 0          while sleepers < n and self._sleeping_count.acquire(False): diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index b490caa7e6..f0827f0a96 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -149,12 +149,15 @@ class Finalize(object):      Class which supports object finalization using weakrefs      '''      def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): -        assert exitpriority is None or type(exitpriority) is int +        if (exitpriority is not None) and not isinstance(exitpriority,int): +            raise TypeError( +                "Exitpriority ({0!r}) must be None or int, not {1!s}".format( +                    exitpriority, type(exitpriority)))          if obj is not None:              self._weakref = weakref.ref(obj, self) -        else: -            assert exitpriority is not None +        elif exitpriority is None: +            raise ValueError("Without object, exitpriority cannot be None")          self._callback = callback          self._args = args | 
