1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import redis
import threading
import time
import unittest
class ConnectionPoolTestCase(unittest.TestCase):
def test_multiple_connections(self):
# 2 clients to the same host/port/db/pool should use the same connection
pool = redis.ConnectionPool()
r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
self.assertEquals(r1.connection, r2.connection)
# if one of them switches, they should have
# separate conncetion objects
r2.select(db=10, host='localhost', port=6379)
self.assertNotEqual(r1.connection, r2.connection)
conns = [r1.connection, r2.connection]
conns.sort()
# but returning to the original state shares the object again
r2.select(db=9, host='localhost', port=6379)
self.assertEquals(r1.connection, r2.connection)
# the connection manager should still have just 2 connections
mgr_conns = pool.get_all_connections()
mgr_conns.sort()
self.assertEquals(conns, mgr_conns)
def test_threaded_workers(self):
r = redis.Redis(host='localhost', port=6379, db=9)
r.set('a', 'foo')
r.set('b', 'bar')
def _info_worker():
for i in range(50):
_ = r.info()
time.sleep(0.01)
def _keys_worker():
for i in range(50):
_ = r.keys()
time.sleep(0.01)
t1 = threading.Thread(target=_info_worker)
t2 = threading.Thread(target=_keys_worker)
t1.start()
t2.start()
for i in [t1, t2]:
i.join()
|