summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py392
1 files changed, 267 insertions, 125 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 7e7126fce..d511dce53 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1115,131 +1115,6 @@ class EngineEventsTest(fixtures.TestBase):
e1.execute(select([1]).compile(dialect=e1.dialect))
e1._execute_compiled(select([1]).compile(dialect=e1.dialect), (), {})
- def test_exception_event(self):
- engine = engines.testing_engine()
- canary = []
-
- @event.listens_for(engine, 'dbapi_error')
- def err(conn, cursor, stmt, parameters, context, exception):
- canary.append((stmt, parameters, exception))
-
- conn = engine.connect()
- try:
- conn.execute("SELECT FOO FROM I_DONT_EXIST")
- assert False
- except tsa.exc.DBAPIError as e:
- assert canary[0][2] is e.orig
- assert canary[0][0] == "SELECT FOO FROM I_DONT_EXIST"
-
- def test_exception_event_reraise(self):
- engine = engines.testing_engine()
-
- class MyException(Exception):
- pass
-
- @event.listens_for(engine, 'dbapi_error', retval=True)
- def err(conn, cursor, stmt, parameters, context, exception):
- if "ERROR ONE" in str(stmt):
- return MyException("my exception")
- elif "ERROR TWO" in str(stmt):
- return exception
- else:
- return None
-
- conn = engine.connect()
- # case 1: custom exception
- assert_raises_message(
- MyException,
- "my exception",
- conn.execute, "SELECT 'ERROR ONE' FROM I_DONT_EXIST"
- )
- # case 2: return the DBAPI exception we're given;
- # no wrapping should occur
- assert_raises(
- conn.dialect.dbapi.Error,
- conn.execute, "SELECT 'ERROR TWO' FROM I_DONT_EXIST"
- )
- # case 3: normal wrapping
- assert_raises(
- tsa.exc.DBAPIError,
- conn.execute, "SELECT 'ERROR THREE' FROM I_DONT_EXIST"
- )
-
- def test_exception_event_reraise_chaining(self):
- engine = engines.testing_engine()
-
- class MyException1(Exception):
- pass
-
- class MyException2(Exception):
- pass
-
- class MyException3(Exception):
- pass
-
- @event.listens_for(engine, 'dbapi_error', retval=True)
- def err1(conn, cursor, stmt, parameters, context, exception):
- if "ERROR ONE" in str(stmt) or "ERROR TWO" in str(stmt) \
- or "ERROR THREE" in str(stmt):
- return MyException1("my exception")
- elif "ERROR FOUR" in str(stmt):
- raise MyException3("my exception short circuit")
-
- @event.listens_for(engine, 'dbapi_error', retval=True)
- def err2(conn, cursor, stmt, parameters, context, exception):
- if ("ERROR ONE" in str(stmt) or "ERROR FOUR" in str(stmt)) \
- and isinstance(exception, MyException1):
- raise MyException2("my exception chained")
- elif "ERROR TWO" in str(stmt):
- return exception
- else:
- return None
-
- conn = engine.connect()
-
- with patch.object(engine.
- dialect.execution_ctx_cls, "handle_dbapi_exception") as patched:
- assert_raises_message(
- MyException2,
- "my exception chained",
- conn.execute, "SELECT 'ERROR ONE' FROM I_DONT_EXIST"
- )
- eq_(patched.call_count, 1)
-
- with patch.object(engine.
- dialect.execution_ctx_cls, "handle_dbapi_exception") as patched:
- assert_raises(
- MyException1,
- conn.execute, "SELECT 'ERROR TWO' FROM I_DONT_EXIST"
- )
- eq_(patched.call_count, 1)
-
- with patch.object(engine.
- dialect.execution_ctx_cls, "handle_dbapi_exception") as patched:
- # test that non None from err1 isn't cancelled out
- # by err2
- assert_raises(
- MyException1,
- conn.execute, "SELECT 'ERROR THREE' FROM I_DONT_EXIST"
- )
- eq_(patched.call_count, 1)
-
- with patch.object(engine.
- dialect.execution_ctx_cls, "handle_dbapi_exception") as patched:
- assert_raises(
- tsa.exc.DBAPIError,
- conn.execute, "SELECT 'ERROR FIVE' FROM I_DONT_EXIST"
- )
- eq_(patched.call_count, 1)
-
- with patch.object(engine.
- dialect.execution_ctx_cls, "handle_dbapi_exception") as patched:
- assert_raises_message(
- MyException3,
- "my exception short circuit",
- conn.execute, "SELECT 'ERROR FOUR' FROM I_DONT_EXIST"
- )
- eq_(patched.call_count, 1)
@@ -1516,6 +1391,273 @@ class EngineEventsTest(fixtures.TestBase):
'prepare_twophase', 'commit_twophase']
)
+class HandleErrorTest(fixtures.TestBase):
+ __requires__ = 'ad_hoc_engines',
+ __backend__ = True
+
+ def tearDown(self):
+ Engine.dispatch._clear()
+ Engine._has_events = False
+
+ def test_legacy_dbapi_error(self):
+ engine = engines.testing_engine()
+ canary = Mock()
+
+ event.listen(engine, "dbapi_error", canary)
+
+ with engine.connect() as conn:
+ try:
+ conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ assert False
+ except tsa.exc.DBAPIError as e:
+ eq_(canary.mock_calls[0][1][5], e.orig)
+ eq_(canary.mock_calls[0][1][2], "SELECT FOO FROM I_DONT_EXIST")
+
+ def test_legacy_dbapi_error_no_ad_hoc_context(self):
+ engine = engines.testing_engine()
+
+ listener = Mock(return_value=None)
+ event.listen(engine, 'dbapi_error', listener)
+
+ nope = Exception("nope")
+ class MyType(TypeDecorator):
+ impl = Integer
+ def process_bind_param(self, value, dialect):
+ raise nope
+
+ with engine.connect() as conn:
+ assert_raises_message(
+ tsa.exc.StatementError,
+ r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
+ conn.execute,
+ select([1]).where(
+ column('foo') == literal('bar', MyType()))
+ )
+ # no legacy event
+ eq_(listener.mock_calls, [])
+
+ def test_legacy_dbapi_error_non_dbapi_error(self):
+ engine = engines.testing_engine()
+
+ listener = Mock(return_value=None)
+ event.listen(engine, 'dbapi_error', listener)
+
+ nope = TypeError("I'm not a DBAPI error")
+ with engine.connect() as c:
+ c.connection.cursor = Mock(
+ return_value=Mock(
+ execute=Mock(
+ side_effect=nope
+ ))
+ )
+
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ c.execute, "select "
+ )
+ # no legacy event
+ eq_(listener.mock_calls, [])
+
+
+ def test_handle_error(self):
+ engine = engines.testing_engine()
+ canary = Mock(return_value=None)
+
+ event.listen(engine, "handle_error", canary)
+
+ with engine.connect() as conn:
+ try:
+ conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ assert False
+ except tsa.exc.DBAPIError as e:
+ ctx = canary.mock_calls[0][1][0]
+
+ eq_(ctx.original_exception, e.orig)
+ is_(ctx.sqlalchemy_exception, e)
+ eq_(ctx.statement, "SELECT FOO FROM I_DONT_EXIST")
+
+ def test_exception_event_reraise(self):
+ engine = engines.testing_engine()
+
+ class MyException(Exception):
+ pass
+
+ @event.listens_for(engine, 'handle_error', retval=True)
+ def err(context):
+ stmt = context.statement
+ exception = context.original_exception
+ if "ERROR ONE" in str(stmt):
+ return MyException("my exception")
+ elif "ERROR TWO" in str(stmt):
+ return exception
+ else:
+ return None
+
+ conn = engine.connect()
+ # case 1: custom exception
+ assert_raises_message(
+ MyException,
+ "my exception",
+ conn.execute, "SELECT 'ERROR ONE' FROM I_DONT_EXIST"
+ )
+ # case 2: return the DBAPI exception we're given;
+ # no wrapping should occur
+ assert_raises(
+ conn.dialect.dbapi.Error,
+ conn.execute, "SELECT 'ERROR TWO' FROM I_DONT_EXIST"
+ )
+ # case 3: normal wrapping
+ assert_raises(
+ tsa.exc.DBAPIError,
+ conn.execute, "SELECT 'ERROR THREE' FROM I_DONT_EXIST"
+ )
+
+ def test_exception_event_reraise_chaining(self):
+ engine = engines.testing_engine()
+
+ class MyException1(Exception):
+ pass
+
+ class MyException2(Exception):
+ pass
+
+ class MyException3(Exception):
+ pass
+
+ @event.listens_for(engine, 'handle_error', retval=True)
+ def err1(context):
+ stmt = context.statement
+
+ if "ERROR ONE" in str(stmt) or "ERROR TWO" in str(stmt) \
+ or "ERROR THREE" in str(stmt):
+ return MyException1("my exception")
+ elif "ERROR FOUR" in str(stmt):
+ raise MyException3("my exception short circuit")
+
+ @event.listens_for(engine, 'handle_error', retval=True)
+ def err2(context):
+ stmt = context.statement
+ if ("ERROR ONE" in str(stmt) or "ERROR FOUR" in str(stmt)) \
+ and isinstance(context.chained_exception, MyException1):
+ raise MyException2("my exception chained")
+ elif "ERROR TWO" in str(stmt):
+ return context.chained_exception
+ else:
+ return None
+
+ conn = engine.connect()
+
+ with patch.object(engine.
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
+ assert_raises_message(
+ MyException2,
+ "my exception chained",
+ conn.execute, "SELECT 'ERROR ONE' FROM I_DONT_EXIST"
+ )
+ eq_(patched.call_count, 1)
+
+ with patch.object(engine.
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
+ assert_raises(
+ MyException1,
+ conn.execute, "SELECT 'ERROR TWO' FROM I_DONT_EXIST"
+ )
+ eq_(patched.call_count, 1)
+
+ with patch.object(engine.
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
+ # test that non None from err1 isn't cancelled out
+ # by err2
+ assert_raises(
+ MyException1,
+ conn.execute, "SELECT 'ERROR THREE' FROM I_DONT_EXIST"
+ )
+ eq_(patched.call_count, 1)
+
+ with patch.object(engine.
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
+ assert_raises(
+ tsa.exc.DBAPIError,
+ conn.execute, "SELECT 'ERROR FIVE' FROM I_DONT_EXIST"
+ )
+ eq_(patched.call_count, 1)
+
+ with patch.object(engine.
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
+ assert_raises_message(
+ MyException3,
+ "my exception short circuit",
+ conn.execute, "SELECT 'ERROR FOUR' FROM I_DONT_EXIST"
+ )
+ eq_(patched.call_count, 1)
+
+ def test_exception_event_ad_hoc_context(self):
+ """test that handle_error is called with a context in
+ cases where _handle_dbapi_error() is normally called without
+ any context.
+
+ """
+
+ engine = engines.testing_engine()
+
+ listener = Mock(return_value=None)
+ event.listen(engine, 'handle_error', listener)
+
+ nope = Exception("nope")
+ class MyType(TypeDecorator):
+ impl = Integer
+ def process_bind_param(self, value, dialect):
+ raise nope
+
+ with engine.connect() as conn:
+ assert_raises_message(
+ tsa.exc.StatementError,
+ r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
+ conn.execute,
+ select([1]).where(
+ column('foo') == literal('bar', MyType()))
+ )
+
+ ctx = listener.mock_calls[0][1][0]
+ assert ctx.statement.startswith("SELECT 1 ")
+ is_(ctx.is_disconnect, False)
+ is_(ctx.original_exception, nope)
+
+ def test_exception_event_non_dbapi_error(self):
+ """test that dbapi_error is called with a context in
+ cases where DBAPI raises an exception that is not a DBAPI
+ exception, e.g. internal errors or encoding problems.
+
+ """
+ engine = engines.testing_engine()
+
+ listener = Mock(return_value=None)
+ event.listen(engine, 'handle_error', listener)
+
+ nope = TypeError("I'm not a DBAPI error")
+ with engine.connect() as c:
+ c.connection.cursor = Mock(
+ return_value=Mock(
+ execute=Mock(
+ side_effect=nope
+ ))
+ )
+
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ c.execute, "select "
+ )
+ ctx = listener.mock_calls[0][1][0]
+ eq_(ctx.statement, "select ")
+ is_(ctx.is_disconnect, False)
+ is_(ctx.original_exception, nope)
class ProxyConnectionTest(fixtures.TestBase):
"""These are the same tests as EngineEventsTest, except using