summaryrefslogtreecommitdiff
path: root/tests/connection_pool.py
blob: 56f5f43c68f8a5afdba6e72a58eac98adca5d8c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import redis
import threading
import time
import unittest

class ConnectionPoolTestCase(unittest.TestCase):
    def test_multiple_connections(self):
        # 2 clients to the same host/port/db/pool should use the same connection
        pool = redis.ConnectionPool()
        r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
        r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
        self.assertEquals(r1.connection, r2.connection)

        # if one of them switches, they should have
        # separate conncetion objects
        r2.select(db=10, host='localhost', port=6379)
        self.assertNotEqual(r1.connection, r2.connection)

        conns = [r1.connection, r2.connection]
        conns.sort()

        # but returning to the original state shares the object again
        r2.select(db=9, host='localhost', port=6379)
        self.assertEquals(r1.connection, r2.connection)

        # the connection manager should still have just 2 connections
        mgr_conns = pool.get_all_connections()
        mgr_conns.sort()
        self.assertEquals(conns, mgr_conns)

    def test_threaded_workers(self):
        r = redis.Redis(host='localhost', port=6379, db=9)
        r.set('a', 'foo')
        r.set('b', 'bar')

        def _info_worker():
            for i in range(50):
                _ = r.info()
                time.sleep(0.01)

        def _keys_worker():
            for i in range(50):
                _ = r.keys()
                time.sleep(0.01)

        t1 = threading.Thread(target=_info_worker)
        t2 = threading.Thread(target=_keys_worker)
        t1.start()
        t2.start()

        for i in [t1, t2]:
            i.join()