diff options
Diffstat (limited to 'Doc/includes/mp_synchronize.py')
| -rw-r--r-- | Doc/includes/mp_synchronize.py | 273 | 
1 files changed, 273 insertions, 0 deletions
| diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py new file mode 100644 index 0000000000..8cf11bd4d0 --- /dev/null +++ b/Doc/includes/mp_synchronize.py @@ -0,0 +1,273 @@ +# +# A test file for the `multiprocessing` package +# + +import time, sys, random +from Queue import Empty + +import multiprocessing               # may get overwritten + + +#### TEST_VALUE + +def value_func(running, mutex): +    random.seed() +    time.sleep(random.random()*4) + +    mutex.acquire() +    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished' +    running.value -= 1 +    mutex.release() + +def test_value(): +    TASKS = 10 +    running = multiprocessing.Value('i', TASKS) +    mutex = multiprocessing.Lock() + +    for i in range(TASKS): +        p = multiprocessing.Process(target=value_func, args=(running, mutex)) +        p.start() + +    while running.value > 0: +        time.sleep(0.08) +        mutex.acquire() +        print running.value, +        sys.stdout.flush() +        mutex.release() + +    print +    print 'No more running processes' + + +#### TEST_QUEUE + +def queue_func(queue): +    for i in range(30): +        time.sleep(0.5 * random.random()) +        queue.put(i*i) +    queue.put('STOP') + +def test_queue(): +    q = multiprocessing.Queue() + +    p = multiprocessing.Process(target=queue_func, args=(q,)) +    p.start() + +    o = None +    while o != 'STOP': +        try: +            o = q.get(timeout=0.3) +            print o, +            sys.stdout.flush() +        except Empty: +            print 'TIMEOUT' + +    print + + +#### TEST_CONDITION + +def condition_func(cond): +    cond.acquire() +    print '\t' + str(cond) +    time.sleep(2) +    print '\tchild is notifying' +    print '\t' + str(cond) +    cond.notify() +    cond.release() + +def test_condition(): +    cond = multiprocessing.Condition() + +    p = multiprocessing.Process(target=condition_func, args=(cond,)) +    print cond + +    cond.acquire() +    print cond +    cond.acquire() +    print cond + +    p.start() + +    print 'main is waiting' +    cond.wait() +    print 'main has woken up' + +    print cond +    cond.release() +    print cond +    cond.release() + +    p.join() +    print cond + + +#### TEST_SEMAPHORE + +def semaphore_func(sema, mutex, running): +    sema.acquire() + +    mutex.acquire() +    running.value += 1 +    print running.value, 'tasks are running' +    mutex.release() + +    random.seed() +    time.sleep(random.random()*2) + +    mutex.acquire() +    running.value -= 1 +    print '%s has finished' % multiprocessing.current_process() +    mutex.release() + +    sema.release() + +def test_semaphore(): +    sema = multiprocessing.Semaphore(3) +    mutex = multiprocessing.RLock() +    running = multiprocessing.Value('i', 0) + +    processes = [ +        multiprocessing.Process(target=semaphore_func, +                                args=(sema, mutex, running)) +        for i in range(10) +        ] + +    for p in processes: +        p.start() + +    for p in processes: +        p.join() + + +#### TEST_JOIN_TIMEOUT + +def join_timeout_func(): +    print '\tchild sleeping' +    time.sleep(5.5) +    print '\n\tchild terminating' + +def test_join_timeout(): +    p = multiprocessing.Process(target=join_timeout_func) +    p.start() + +    print 'waiting for process to finish' + +    while 1: +        p.join(timeout=1) +        if not p.is_alive(): +            break +        print '.', +        sys.stdout.flush() + + +#### TEST_EVENT + +def event_func(event): +    print '\t%r is waiting' % multiprocessing.current_process() +    event.wait() +    print '\t%r has woken up' % multiprocessing.current_process() + +def test_event(): +    event = multiprocessing.Event() + +    processes = [multiprocessing.Process(target=event_func, args=(event,)) +                 for i in range(5)] + +    for p in processes: +        p.start() + +    print 'main is sleeping' +    time.sleep(2) + +    print 'main is setting event' +    event.set() + +    for p in processes: +        p.join() + + +#### TEST_SHAREDVALUES + +def sharedvalues_func(values, arrays, shared_values, shared_arrays): +    for i in range(len(values)): +        v = values[i][1] +        sv = shared_values[i].value +        assert v == sv + +    for i in range(len(values)): +        a = arrays[i][1] +        sa = list(shared_arrays[i][:]) +        assert a == sa + +    print 'Tests passed' + +def test_sharedvalues(): +    values = [ +        ('i', 10), +        ('h', -2), +        ('d', 1.25) +        ] +    arrays = [ +        ('i', range(100)), +        ('d', [0.25 * i for i in range(100)]), +        ('H', range(1000)) +        ] + +    shared_values = [multiprocessing.Value(id, v) for id, v in values] +    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] + +    p = multiprocessing.Process( +        target=sharedvalues_func, +        args=(values, arrays, shared_values, shared_arrays) +        ) +    p.start() +    p.join() + +    assert p.get_exitcode() == 0 + + +#### + +def test(namespace=multiprocessing): +    global multiprocessing + +    multiprocessing = namespace + +    for func in [ test_value, test_queue, test_condition, +                  test_semaphore, test_join_timeout, test_event, +                  test_sharedvalues ]: + +        print '\n\t######## %s\n' % func.__name__ +        func() + +    ignore = multiprocessing.active_children()      # cleanup any old processes +    if hasattr(multiprocessing, '_debug_info'): +        info = multiprocessing._debug_info() +        if info: +            print info +            raise ValueError, 'there should be no positive refcounts left' + + +if __name__ == '__main__': +    multiprocessing.freeze_support() + +    assert len(sys.argv) in (1, 2) + +    if len(sys.argv) == 1 or sys.argv[1] == 'processes': +        print ' Using processes '.center(79, '-') +        namespace = multiprocessing +    elif sys.argv[1] == 'manager': +        print ' Using processes and a manager '.center(79, '-') +        namespace = multiprocessing.Manager() +        namespace.Process = multiprocessing.Process +        namespace.current_process = multiprocessing.current_process +        namespace.active_children = multiprocessing.active_children +    elif sys.argv[1] == 'threads': +        print ' Using threads '.center(79, '-') +        import multiprocessing.dummy as namespace +    else: +        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0] +        raise SystemExit, 2 + +    test(namespace) | 
