diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-30 18:35:12 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-30 18:35:12 -0400 |
commit | b38a76cd1d47cd6b8f1abef30ad7c3aeaa27d537 (patch) | |
tree | 7af1dba9e242c77a248cb2194434aa9bf3ca49b7 /test/engine/test_pool.py | |
parent | 715d6cf3d10a71acd7726b7e00c3ff40b4559bc7 (diff) | |
download | sqlalchemy-b38a76cd1d47cd6b8f1abef30ad7c3aeaa27d537.tar.gz |
- replace most explicitly-named test objects called "Mock..." with
actual mock objects from the mock library. I'd like to use mock
for new tests so we might as well use it in obvious places.
- use unittest.mock in py3.3
- changelog
- add a note to README.unittests
- add tests_require in setup.py
- have tests import from sqlalchemy.testing.mock
- apply usage of mock to one of the event tests. we can be using
this approach all over the place.
Diffstat (limited to 'test/engine/test_pool.py')
-rw-r--r-- | test/engine/test_pool.py | 149 |
1 files changed, 85 insertions, 64 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 583978465..981df6dd0 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -4,37 +4,21 @@ from sqlalchemy import pool, select, event import sqlalchemy as tsa from sqlalchemy import testing from sqlalchemy.testing.util import gc_collect, lazy_gc -from sqlalchemy.testing import eq_, assert_raises +from sqlalchemy.testing import eq_, assert_raises, is_not_ from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing import fixtures -mcid = 1 -class MockDBAPI(object): - throw_error = False - def connect(self, *args, **kwargs): - if self.throw_error: - raise Exception("couldnt connect !") - delay = kwargs.pop('delay', 0) - if delay: - time.sleep(delay) - return MockConnection() -class MockConnection(object): - closed = False - def __init__(self): - global mcid - self.id = mcid - mcid += 1 - def close(self): - self.closed = True - def rollback(self): - pass - def cursor(self): - return MockCursor() -class MockCursor(object): - def execute(self, *args, **kw): - pass - def close(self): - pass +from sqlalchemy.testing.mock import Mock, call + +def MockDBAPI(): + def cursor(): + while True: + yield Mock() + def connect(): + while True: + yield Mock(cursor=Mock(side_effect=cursor())) + + return Mock(connect=Mock(side_effect=connect())) class PoolTestBase(fixtures.TestBase): def setup(self): @@ -71,11 +55,9 @@ class PoolTest(PoolTestBase): assert c4 is not c5 def test_manager_with_key(self): - class NoKws(object): - def connect(self, arg): - return MockConnection() - manager = pool.manage(NoKws(), use_threadlocal=True) + dbapi = MockDBAPI() + manager = pool.manage(dbapi, use_threadlocal=True) c1 = manager.connect('foo.db', sa_pool_key="a") c2 = manager.connect('foo.db', sa_pool_key="b") @@ -83,9 +65,14 @@ class PoolTest(PoolTestBase): assert c1.cursor() is not None assert c1 is not c2 - assert c1 is c3 - + assert c1 is c3 + eq_(dbapi.connect.mock_calls, + [ + call("foo.db"), + call("foo.db"), + ] + ) def test_bad_args(self): @@ -127,7 +114,7 @@ class PoolTest(PoolTestBase): p = cls(creator=mock_dbapi.connect) conn = p.connect() conn.close() - mock_dbapi.throw_error = True + mock_dbapi.connect.side_effect = Exception("error!") p.dispose() p.recreate() @@ -211,9 +198,9 @@ class PoolTest(PoolTestBase): self.assert_('foo2' in c.info) c2 = p.connect() - self.assert_(c.connection is not c2.connection) - self.assert_(not c2.info) - self.assert_('foo2' in c.info) + is_not_(c.connection, c2.connection) + assert not c2.info + assert 'foo2' in c.info class PoolDialectTest(PoolTestBase): @@ -945,19 +932,24 @@ class QueuePoolTest(PoolTestBase): def test_dispose_closes_pooled(self): dbapi = MockDBAPI() - def creator(): - return dbapi.connect() - p = pool.QueuePool(creator=creator, + p = pool.QueuePool(creator=dbapi.connect, pool_size=2, timeout=None, max_overflow=0) c1 = p.connect() c2 = p.connect() - conns = [c1.connection, c2.connection] + c1_con = c1.connection + c2_con = c2.connection + c1.close() - eq_([c.closed for c in conns], [False, False]) + + eq_(c1_con.close.call_count, 0) + eq_(c2_con.close.call_count, 0) + p.dispose() - eq_([c.closed for c in conns], [True, False]) + + eq_(c1_con.close.call_count, 1) + eq_(c2_con.close.call_count, 0) # currently, if a ConnectionFairy is closed # after the pool has been disposed, there's no @@ -965,11 +957,12 @@ class QueuePoolTest(PoolTestBase): # immediately - it just gets returned to the # pool normally... c2.close() - eq_([c.closed for c in conns], [True, False]) + eq_(c1_con.close.call_count, 1) + eq_(c2_con.close.call_count, 0) # ...and that's the one we'll get back next. c3 = p.connect() - assert c3.connection is conns[1] + assert c3.connection is c2_con def test_no_overflow(self): self._test_overflow(40, 0) @@ -1010,12 +1003,21 @@ class QueuePoolTest(PoolTestBase): return c for j in range(5): + # open 4 conns at a time. each time this + # will yield two pooled connections + two + # overflow connections. conns = [_conn() for i in range(4)] for c in conns: c.close() - still_opened = len([c for c in strong_refs if not c.closed]) - eq_(still_opened, 2) + # doing that for a total of 5 times yields + # ten overflow connections closed plus the + # two pooled connections unclosed. + + eq_( + set([c.close.call_count for c in strong_refs]), + set([1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0]) + ) @testing.requires.predictable_gc def test_weakref_kaboom(self): @@ -1108,18 +1110,30 @@ class QueuePoolTest(PoolTestBase): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0) c1 = p.connect() c1.detach() - c_id = c1.connection.id - c2 = p.connect() - assert c2.connection.id != c1.connection.id - dbapi.raise_error = True - c2.invalidate() - c2 = None c2 = p.connect() - assert c2.connection.id != c1.connection.id - con = c1.connection - assert not con.closed + eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")]) + + c1_con = c1.connection + assert c1_con is not None + eq_(c1_con.close.call_count, 0) c1.close() - assert con.closed + eq_(c1_con.close.call_count, 1) + + def test_detach_via_invalidate(self): + dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0) + + c1 = p.connect() + c1_con = c1.connection + c1.invalidate() + assert c1.connection is None + eq_(c1_con.close.call_count, 1) + + c2 = p.connect() + assert c2.connection is not c1_con + c2_con = c2.connection + + c2.close() + eq_(c2_con.close.call_count, 0) def test_threadfairy(self): p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True) @@ -1141,8 +1155,13 @@ class SingletonThreadPoolTest(PoolTestBase): been called.""" dbapi = MockDBAPI() - p = pool.SingletonThreadPool(creator=dbapi.connect, - pool_size=3) + + lock = threading.Lock() + def creator(): + # the mock iterator isn't threadsafe... + with lock: + return dbapi.connect() + p = pool.SingletonThreadPool(creator=creator, pool_size=3) if strong_refs: sr = set() @@ -1172,7 +1191,7 @@ class SingletonThreadPoolTest(PoolTestBase): assert len(p._all_conns) == 3 if strong_refs: - still_opened = len([c for c in sr if not c.closed]) + still_opened = len([c for c in sr if not c.close.call_count]) eq_(still_opened, 3) class AssertionPoolTest(PoolTestBase): @@ -1198,17 +1217,19 @@ class NullPoolTest(PoolTestBase): dbapi = MockDBAPI() p = pool.NullPool(creator=lambda: dbapi.connect('foo.db')) c1 = p.connect() - c_id = c1.connection.id + c1.close() c1 = None c1 = p.connect() - dbapi.raise_error = True c1.invalidate() c1 = None c1 = p.connect() - assert c1.connection.id != c_id + dbapi.connect.assert_has_calls([ + call('foo.db'), + call('foo.db')], + any_order=True) class StaticPoolTest(PoolTestBase): |