summaryrefslogtreecommitdiff
path: root/test/engine/test_deprecations.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_deprecations.py')
-rw-r--r--test/engine/test_deprecations.py611
1 files changed, 0 insertions, 611 deletions
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 502fe0245..b7c3cf52f 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -1,5 +1,3 @@
-import re
-
import sqlalchemy as tsa
from sqlalchemy import column
from sqlalchemy import create_engine
@@ -17,7 +15,6 @@ from sqlalchemy import text
from sqlalchemy import TypeDecorator
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
-from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import engines
@@ -28,7 +25,6 @@ from sqlalchemy.testing import is_true
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-from sqlalchemy.testing.util import lazy_gc
class SomeException(Exception):
@@ -64,15 +60,6 @@ class CreateEngineTest(fixtures.TestBase):
)
-def _proxy_execute_deprecated():
- return (
- testing.expect_deprecated("ConnectionProxy.execute is deprecated."),
- testing.expect_deprecated(
- "ConnectionProxy.cursor_execute is deprecated."
- ),
- )
-
-
class TransactionTest(fixtures.TestBase):
__backend__ = True
@@ -128,289 +115,6 @@ class TransactionTest(fixtures.TestBase):
eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
-class ProxyConnectionTest(fixtures.TestBase):
-
- """These are the same tests as EngineEventsTest, except using
- the deprecated ConnectionProxy interface.
-
- """
-
- __requires__ = ("ad_hoc_engines",)
- __prefer_requires__ = ("two_phase_transactions",)
-
- @testing.uses_deprecated(r".*Use event.listen")
- @testing.fails_on("firebird", "Data type unknown")
- def test_proxy(self):
-
- stmts = []
- cursor_stmts = []
-
- class MyProxy(ConnectionProxy):
- def execute(
- self, conn, execute, clauseelement, *multiparams, **params
- ):
- stmts.append((str(clauseelement), params, multiparams))
- return execute(clauseelement, *multiparams, **params)
-
- def cursor_execute(
- self,
- execute,
- cursor,
- statement,
- parameters,
- context,
- executemany,
- ):
- cursor_stmts.append((str(statement), parameters, None))
- return execute(cursor, statement, parameters, context)
-
- def assert_stmts(expected, received):
- for stmt, params, posn in expected:
- if not received:
- assert False, "Nothing available for stmt: %s" % stmt
- while received:
- teststmt, testparams, testmultiparams = received.pop(0)
- teststmt = (
- re.compile(r"[\n\t ]+", re.M)
- .sub(" ", teststmt)
- .strip()
- )
- if teststmt.startswith(stmt) and (
- testparams == params or testparams == posn
- ):
- break
-
- with testing.expect_deprecated(
- "ConnectionProxy.execute is deprecated.",
- "ConnectionProxy.cursor_execute is deprecated.",
- ):
- plain_engine = engines.testing_engine(
- options=dict(implicit_returning=False, proxy=MyProxy())
- )
-
- for engine in (plain_engine,):
- m = MetaData(engine)
- t1 = Table(
- "t1",
- m,
- Column("c1", Integer, primary_key=True),
- Column(
- "c2",
- String(50),
- default=func.lower("Foo"),
- primary_key=True,
- ),
- )
- m.create_all()
- try:
- t1.insert().execute(c1=5, c2="some data")
- t1.insert().execute(c1=6)
- eq_(
- engine.execute("select * from t1").fetchall(),
- [(5, "some data"), (6, "foo")],
- )
- finally:
- m.drop_all()
- engine.dispose()
- compiled = [
- ("CREATE TABLE t1", {}, None),
- (
- "INSERT INTO t1 (c1, c2)",
- {"c2": "some data", "c1": 5},
- None,
- ),
- ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None),
- ("select * from t1", {}, None),
- ("DROP TABLE t1", {}, None),
- ]
-
- cursor = [
- ("CREATE TABLE t1", {}, ()),
- (
- "INSERT INTO t1 (c1, c2)",
- {"c2": "some data", "c1": 5},
- (5, "some data"),
- ),
- ("SELECT lower", {"lower_1": "Foo"}, ("Foo",)),
- (
- "INSERT INTO t1 (c1, c2)",
- {"c2": "foo", "c1": 6},
- (6, "foo"),
- ),
- ("select * from t1", {}, ()),
- ("DROP TABLE t1", {}, ()),
- ]
-
- assert_stmts(compiled, stmts)
- assert_stmts(cursor, cursor_stmts)
-
- @testing.uses_deprecated(r".*Use event.listen")
- def test_options(self):
- canary = []
-
- class TrackProxy(ConnectionProxy):
- def __getattribute__(self, key):
- fn = object.__getattribute__(self, key)
-
- def go(*arg, **kw):
- canary.append(fn.__name__)
- return fn(*arg, **kw)
-
- return go
-
- with testing.expect_deprecated(
- *[
- "ConnectionProxy.%s is deprecated" % name
- for name in [
- "execute",
- "cursor_execute",
- "begin",
- "rollback",
- "commit",
- "savepoint",
- "rollback_savepoint",
- "release_savepoint",
- "begin_twophase",
- "prepare_twophase",
- "rollback_twophase",
- "commit_twophase",
- ]
- ]
- ):
- engine = engines.testing_engine(options={"proxy": TrackProxy()})
- conn = engine.connect()
- c2 = conn.execution_options(foo="bar")
- eq_(c2._execution_options, {"foo": "bar"})
- c2.execute(select([1]))
- c3 = c2.execution_options(bar="bat")
- eq_(c3._execution_options, {"foo": "bar", "bar": "bat"})
- eq_(canary, ["execute", "cursor_execute"])
-
- @testing.uses_deprecated(r".*Use event.listen")
- def test_transactional(self):
- canary = []
-
- class TrackProxy(ConnectionProxy):
- def __getattribute__(self, key):
- fn = object.__getattribute__(self, key)
-
- def go(*arg, **kw):
- canary.append(fn.__name__)
- return fn(*arg, **kw)
-
- return go
-
- with testing.expect_deprecated(
- *[
- "ConnectionProxy.%s is deprecated" % name
- for name in [
- "execute",
- "cursor_execute",
- "begin",
- "rollback",
- "commit",
- "savepoint",
- "rollback_savepoint",
- "release_savepoint",
- "begin_twophase",
- "prepare_twophase",
- "rollback_twophase",
- "commit_twophase",
- ]
- ]
- ):
- engine = engines.testing_engine(options={"proxy": TrackProxy()})
- conn = engine.connect()
- trans = conn.begin()
- conn.execute(select([1]))
- trans.rollback()
- trans = conn.begin()
- conn.execute(select([1]))
- trans.commit()
-
- eq_(
- canary,
- [
- "begin",
- "execute",
- "cursor_execute",
- "rollback",
- "begin",
- "execute",
- "cursor_execute",
- "commit",
- ],
- )
-
- @testing.uses_deprecated(r".*Use event.listen")
- @testing.requires.savepoints
- @testing.requires.two_phase_transactions
- def test_transactional_advanced(self):
- canary = []
-
- class TrackProxy(ConnectionProxy):
- def __getattribute__(self, key):
- fn = object.__getattribute__(self, key)
-
- def go(*arg, **kw):
- canary.append(fn.__name__)
- return fn(*arg, **kw)
-
- return go
-
- with testing.expect_deprecated(
- *[
- "ConnectionProxy.%s is deprecated" % name
- for name in [
- "execute",
- "cursor_execute",
- "begin",
- "rollback",
- "commit",
- "savepoint",
- "rollback_savepoint",
- "release_savepoint",
- "begin_twophase",
- "prepare_twophase",
- "rollback_twophase",
- "commit_twophase",
- ]
- ]
- ):
- engine = engines.testing_engine(options={"proxy": TrackProxy()})
- conn = engine.connect()
-
- trans = conn.begin()
- trans2 = conn.begin_nested()
- conn.execute(select([1]))
- trans2.rollback()
- trans2 = conn.begin_nested()
- conn.execute(select([1]))
- trans2.commit()
- trans.rollback()
-
- trans = conn.begin_twophase()
- conn.execute(select([1]))
- trans.prepare()
- trans.commit()
-
- canary = [t for t in canary if t not in ("cursor_execute", "execute")]
- eq_(
- canary,
- [
- "begin",
- "savepoint",
- "rollback_savepoint",
- "savepoint",
- "release_savepoint",
- "rollback",
- "begin_twophase",
- "prepare_twophase",
- "commit_twophase",
- ],
- )
-
-
class HandleInvalidatedOnConnectTest(fixtures.TestBase):
__requires__ = ("sqlite",)
@@ -570,321 +274,6 @@ class PoolTestBase(fixtures.TestBase):
)
-class DeprecatedPoolListenerTest(PoolTestBase):
- @testing.requires.predictable_gc
- @testing.uses_deprecated(
- r".*Use the PoolEvents", r".*'listeners' argument .* is deprecated"
- )
- def test_listeners(self):
- class InstrumentingListener(object):
- def __init__(self):
- if hasattr(self, "connect"):
- self.connect = self.inst_connect
- if hasattr(self, "first_connect"):
- self.first_connect = self.inst_first_connect
- if hasattr(self, "checkout"):
- self.checkout = self.inst_checkout
- if hasattr(self, "checkin"):
- self.checkin = self.inst_checkin
- self.clear()
-
- def clear(self):
- self.connected = []
- self.first_connected = []
- self.checked_out = []
- self.checked_in = []
-
- def assert_total(self, conn, fconn, cout, cin):
- eq_(len(self.connected), conn)
- eq_(len(self.first_connected), fconn)
- eq_(len(self.checked_out), cout)
- eq_(len(self.checked_in), cin)
-
- def assert_in(self, item, in_conn, in_fconn, in_cout, in_cin):
- eq_((item in self.connected), in_conn)
- eq_((item in self.first_connected), in_fconn)
- eq_((item in self.checked_out), in_cout)
- eq_((item in self.checked_in), in_cin)
-
- def inst_connect(self, con, record):
- print("connect(%s, %s)" % (con, record))
- assert con is not None
- assert record is not None
- self.connected.append(con)
-
- def inst_first_connect(self, con, record):
- print("first_connect(%s, %s)" % (con, record))
- assert con is not None
- assert record is not None
- self.first_connected.append(con)
-
- def inst_checkout(self, con, record, proxy):
- print("checkout(%s, %s, %s)" % (con, record, proxy))
- assert con is not None
- assert record is not None
- assert proxy is not None
- self.checked_out.append(con)
-
- def inst_checkin(self, con, record):
- print("checkin(%s, %s)" % (con, record))
- # con can be None if invalidated
- assert record is not None
- self.checked_in.append(con)
-
- class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
- pass
-
- class ListenConnect(InstrumentingListener):
- def connect(self, con, record):
- pass
-
- class ListenFirstConnect(InstrumentingListener):
- def first_connect(self, con, record):
- pass
-
- class ListenCheckOut(InstrumentingListener):
- def checkout(self, con, record, proxy, num):
- pass
-
- class ListenCheckIn(InstrumentingListener):
- def checkin(self, con, record):
- pass
-
- def assert_listeners(p, total, conn, fconn, cout, cin):
- for instance in (p, p.recreate()):
- self.assert_(len(instance.dispatch.connect) == conn)
- self.assert_(len(instance.dispatch.first_connect) == fconn)
- self.assert_(len(instance.dispatch.checkout) == cout)
- self.assert_(len(instance.dispatch.checkin) == cin)
-
- p = self._queuepool_fixture()
- assert_listeners(p, 0, 0, 0, 0, 0)
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["connect", "first_connect", "checkout", "checkin"]
- ]
- ):
- p.add_listener(ListenAll())
- assert_listeners(p, 1, 1, 1, 1, 1)
-
- with testing.expect_deprecated(
- *["PoolListener.%s is deprecated." % name for name in ["connect"]]
- ):
- p.add_listener(ListenConnect())
- assert_listeners(p, 2, 2, 1, 1, 1)
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["first_connect"]
- ]
- ):
- p.add_listener(ListenFirstConnect())
- assert_listeners(p, 3, 2, 2, 1, 1)
-
- with testing.expect_deprecated(
- *["PoolListener.%s is deprecated." % name for name in ["checkout"]]
- ):
- p.add_listener(ListenCheckOut())
- assert_listeners(p, 4, 2, 2, 2, 1)
-
- with testing.expect_deprecated(
- *["PoolListener.%s is deprecated." % name for name in ["checkin"]]
- ):
- p.add_listener(ListenCheckIn())
- assert_listeners(p, 5, 2, 2, 2, 2)
- del p
-
- snoop = ListenAll()
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["connect", "first_connect", "checkout", "checkin"]
- ]
- + [
- "PoolListener is deprecated in favor of the PoolEvents "
- "listener interface. The Pool.listeners parameter "
- "will be removed"
- ]
- ):
- p = self._queuepool_fixture(listeners=[snoop])
- assert_listeners(p, 1, 1, 1, 1, 1)
-
- c = p.connect()
- snoop.assert_total(1, 1, 1, 0)
- cc = c.connection
- snoop.assert_in(cc, True, True, True, False)
- c.close()
- snoop.assert_in(cc, True, True, True, True)
- del c, cc
-
- snoop.clear()
-
- # this one depends on immediate gc
- c = p.connect()
- cc = c.connection
- snoop.assert_in(cc, False, False, True, False)
- snoop.assert_total(0, 0, 1, 0)
- del c, cc
- lazy_gc()
- snoop.assert_total(0, 0, 1, 1)
-
- p.dispose()
- snoop.clear()
-
- c = p.connect()
- c.close()
- c = p.connect()
- snoop.assert_total(1, 0, 2, 1)
- c.close()
- snoop.assert_total(1, 0, 2, 2)
-
- # invalidation
- p.dispose()
- snoop.clear()
-
- c = p.connect()
- snoop.assert_total(1, 0, 1, 0)
- c.invalidate()
- snoop.assert_total(1, 0, 1, 1)
- c.close()
- snoop.assert_total(1, 0, 1, 1)
- del c
- lazy_gc()
- snoop.assert_total(1, 0, 1, 1)
- c = p.connect()
- snoop.assert_total(2, 0, 2, 1)
- c.close()
- del c
- lazy_gc()
- snoop.assert_total(2, 0, 2, 2)
-
- # detached
- p.dispose()
- snoop.clear()
-
- c = p.connect()
- snoop.assert_total(1, 0, 1, 0)
- c.detach()
- snoop.assert_total(1, 0, 1, 0)
- c.close()
- del c
- snoop.assert_total(1, 0, 1, 0)
- c = p.connect()
- snoop.assert_total(2, 0, 2, 0)
- c.close()
- del c
- snoop.assert_total(2, 0, 2, 1)
-
- # recreated
- p = p.recreate()
- snoop.clear()
-
- c = p.connect()
- snoop.assert_total(1, 1, 1, 0)
- c.close()
- snoop.assert_total(1, 1, 1, 1)
- c = p.connect()
- snoop.assert_total(1, 1, 2, 1)
- c.close()
- snoop.assert_total(1, 1, 2, 2)
-
- @testing.uses_deprecated(r".*Use the PoolEvents")
- def test_listeners_callables(self):
- def connect(dbapi_con, con_record):
- counts[0] += 1
-
- def checkout(dbapi_con, con_record, con_proxy):
- counts[1] += 1
-
- def checkin(dbapi_con, con_record):
- counts[2] += 1
-
- i_all = dict(connect=connect, checkout=checkout, checkin=checkin)
- i_connect = dict(connect=connect)
- i_checkout = dict(checkout=checkout)
- i_checkin = dict(checkin=checkin)
-
- for cls in (pool.QueuePool, pool.StaticPool):
- counts = [0, 0, 0]
-
- def assert_listeners(p, total, conn, cout, cin):
- for instance in (p, p.recreate()):
- eq_(len(instance.dispatch.connect), conn)
- eq_(len(instance.dispatch.checkout), cout)
- eq_(len(instance.dispatch.checkin), cin)
-
- p = self._queuepool_fixture()
- assert_listeners(p, 0, 0, 0, 0)
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["connect", "checkout", "checkin"]
- ]
- ):
- p.add_listener(i_all)
- assert_listeners(p, 1, 1, 1, 1)
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["connect"]
- ]
- ):
- p.add_listener(i_connect)
- assert_listeners(p, 2, 1, 1, 1)
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["checkout"]
- ]
- ):
- p.add_listener(i_checkout)
- assert_listeners(p, 3, 1, 1, 1)
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["checkin"]
- ]
- ):
- p.add_listener(i_checkin)
- assert_listeners(p, 4, 1, 1, 1)
- del p
-
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["connect", "checkout", "checkin"]
- ]
- + [".*The Pool.listeners parameter will be removed"]
- ):
- p = self._queuepool_fixture(listeners=[i_all])
- assert_listeners(p, 1, 1, 1, 1)
-
- c = p.connect()
- assert counts == [1, 1, 0]
- c.close()
- assert counts == [1, 1, 1]
-
- c = p.connect()
- assert counts == [1, 2, 1]
- with testing.expect_deprecated(
- *[
- "PoolListener.%s is deprecated." % name
- for name in ["checkin"]
- ]
- ):
- p.add_listener(i_checkin)
- c.close()
- assert counts == [1, 2, 2]
-
-
class ExplicitAutoCommitDeprecatedTest(fixtures.TestBase):
"""test the 'autocommit' flag on select() and text() objects.