blob: 374fa23820bd7b9cd8b5403cd0f322d49fbddcfb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
from sqlalchemy import event
from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import profiling
pool = None
class QueuePoolTest(fixtures.TestBase, AssertsExecutionResults):
__requires__ = ("cpython", "python_profiling_backend")
class Connection:
def rollback(self):
pass
def close(self):
pass
def setup_test(self):
# create a throwaway pool which
# has the effect of initializing
# class-level event listeners on Pool,
# if not present already.
p1 = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
p1.connect()
global pool
pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
# make this a real world case where we have a "connect" handler
@event.listens_for(pool, "connect")
def do_connect(dbapi_conn, conn_record):
pass
@profiling.function_call_count()
def test_first_connect(self):
pool.connect()
def test_second_connect(self):
conn = pool.connect()
conn.close()
@profiling.function_call_count()
def go():
conn2 = pool.connect()
return conn2
go()
|