diff options
Diffstat (limited to 'Doc/includes/mp_pool.py')
| -rw-r--r-- | Doc/includes/mp_pool.py | 311 | 
1 files changed, 311 insertions, 0 deletions
| diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py new file mode 100644 index 0000000000..b937b86f53 --- /dev/null +++ b/Doc/includes/mp_pool.py @@ -0,0 +1,311 @@ +# +# A test of `multiprocessing.Pool` class +# + +import multiprocessing +import time +import random +import sys + +# +# Functions used by test code +# + +def calculate(func, args): +    result = func(*args) +    return '%s says that %s%s = %s' % ( +        multiprocessing.current_process().get_name(), +        func.__name__, args, result +        ) + +def calculatestar(args): +    return calculate(*args) + +def mul(a, b): +    time.sleep(0.5*random.random()) +    return a * b + +def plus(a, b): +    time.sleep(0.5*random.random()) +    return a + b + +def f(x): +    return 1.0 / (x-5.0) + +def pow3(x): +    return x**3 + +def noop(x): +    pass + +# +# Test code +# + +def test(): +    print 'cpu_count() = %d\n' % multiprocessing.cpu_count() + +    # +    # Create pool +    # + +    PROCESSES = 4 +    print 'Creating pool with %d processes\n' % PROCESSES +    pool = multiprocessing.Pool(PROCESSES) +    print 'pool = %s' % pool +    print + +    # +    # Tests +    # + +    TASKS = [(mul, (i, 7)) for i in range(10)] + \ +            [(plus, (i, 8)) for i in range(10)] + +    results = [pool.apply_async(calculate, t) for t in TASKS] +    imap_it = pool.imap(calculatestar, TASKS) +    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) + +    print 'Ordered results using pool.apply_async():' +    for r in results: +        print '\t', r.get() +    print + +    print 'Ordered results using pool.imap():' +    for x in imap_it: +        print '\t', x +    print + +    print 'Unordered results using pool.imap_unordered():' +    for x in imap_unordered_it: +        print '\t', x +    print + +    print 'Ordered results using pool.map() --- will block till complete:' +    for x in pool.map(calculatestar, TASKS): +        print '\t', x +    print + +    # +    # Simple benchmarks +    # + +    N = 100000 +    print 'def pow3(x): return x**3' + +    t = time.time() +    A = map(pow3, xrange(N)) +    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ +          (N, time.time() - t) + +    t = time.time() +    B = pool.map(pow3, xrange(N)) +    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ +          (N, time.time() - t) + +    t = time.time() +    C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) +    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ +          ' seconds' % (N, N//8, time.time() - t) + +    assert A == B == C, (len(A), len(B), len(C)) +    print + +    L = [None] * 1000000 +    print 'def noop(x): pass' +    print 'L = [None] * 1000000' + +    t = time.time() +    A = map(noop, L) +    print '\tmap(noop, L):\n\t\t%s seconds' % \ +          (time.time() - t) + +    t = time.time() +    B = pool.map(noop, L) +    print '\tpool.map(noop, L):\n\t\t%s seconds' % \ +          (time.time() - t) + +    t = time.time() +    C = list(pool.imap(noop, L, chunksize=len(L)//8)) +    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ +          (len(L)//8, time.time() - t) + +    assert A == B == C, (len(A), len(B), len(C)) +    print + +    del A, B, C, L + +    # +    # Test error handling +    # + +    print 'Testing error handling:' + +    try: +        print pool.apply(f, (5,)) +    except ZeroDivisionError: +        print '\tGot ZeroDivisionError as expected from pool.apply()' +    else: +        raise AssertionError, 'expected ZeroDivisionError' + +    try: +        print pool.map(f, range(10)) +    except ZeroDivisionError: +        print '\tGot ZeroDivisionError as expected from pool.map()' +    else: +        raise AssertionError, 'expected ZeroDivisionError' + +    try: +        print list(pool.imap(f, range(10))) +    except ZeroDivisionError: +        print '\tGot ZeroDivisionError as expected from list(pool.imap())' +    else: +        raise AssertionError, 'expected ZeroDivisionError' + +    it = pool.imap(f, range(10)) +    for i in range(10): +        try: +            x = it.next() +        except ZeroDivisionError: +            if i == 5: +                pass +        except StopIteration: +            break +        else: +            if i == 5: +                raise AssertionError, 'expected ZeroDivisionError' + +    assert i == 9 +    print '\tGot ZeroDivisionError as expected from IMapIterator.next()' +    print + +    # +    # Testing timeouts +    # + +    print 'Testing ApplyResult.get() with timeout:', +    res = pool.apply_async(calculate, TASKS[0]) +    while 1: +        sys.stdout.flush() +        try: +            sys.stdout.write('\n\t%s' % res.get(0.02)) +            break +        except multiprocessing.TimeoutError: +            sys.stdout.write('.') +    print +    print + +    print 'Testing IMapIterator.next() with timeout:', +    it = pool.imap(calculatestar, TASKS) +    while 1: +        sys.stdout.flush() +        try: +            sys.stdout.write('\n\t%s' % it.next(0.02)) +        except StopIteration: +            break +        except multiprocessing.TimeoutError: +            sys.stdout.write('.') +    print +    print + +    # +    # Testing callback +    # + +    print 'Testing callback:' + +    A = [] +    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] + +    r = pool.apply_async(mul, (7, 8), callback=A.append) +    r.wait() + +    r = pool.map_async(pow3, range(10), callback=A.extend) +    r.wait() + +    if A == B: +        print '\tcallbacks succeeded\n' +    else: +        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) + +    # +    # Check there are no outstanding tasks +    # + +    assert not pool._cache, 'cache = %r' % pool._cache + +    # +    # Check close() methods +    # + +    print 'Testing close():' + +    for worker in pool._pool: +        assert worker.is_alive() + +    result = pool.apply_async(time.sleep, [0.5]) +    pool.close() +    pool.join() + +    assert result.get() is None + +    for worker in pool._pool: +        assert not worker.is_alive() + +    print '\tclose() succeeded\n' + +    # +    # Check terminate() method +    # + +    print 'Testing terminate():' + +    pool = multiprocessing.Pool(2) +    DELTA = 0.1 +    ignore = pool.apply(pow3, [2]) +    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] +    pool.terminate() +    pool.join() + +    for worker in pool._pool: +        assert not worker.is_alive() + +    print '\tterminate() succeeded\n' + +    # +    # Check garbage collection +    # + +    print 'Testing garbage collection:' + +    pool = multiprocessing.Pool(2) +    DELTA = 0.1 +    processes = pool._pool +    ignore = pool.apply(pow3, [2]) +    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + +    results = pool = None + +    time.sleep(DELTA * 2) + +    for worker in processes: +        assert not worker.is_alive() + +    print '\tgarbage collection succeeded\n' + + +if __name__ == '__main__': +    multiprocessing.freeze_support() + +    assert len(sys.argv) in (1, 2) + +    if len(sys.argv) == 1 or sys.argv[1] == 'processes': +        print ' Using processes '.center(79, '-') +    elif sys.argv[1] == 'threads': +        print ' Using threads '.center(79, '-') +        import multiprocessing.dummy as multiprocessing +    else: +        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0] +        raise SystemExit(2) + +    test() | 
