summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-08-29 11:22:46 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2010-08-29 11:22:46 -0400
commit1149197a36f01dae1f8da82b7cfb73a7777e7a4a (patch)
tree3b2cacd6c9fc75c51862eb886420036b7cd08c47 /test/engine/test_execute.py
parenta635750213c346a895e417ae8e629ce924d557e8 (diff)
downloadsqlalchemy-1149197a36f01dae1f8da82b7cfb73a7777e7a4a.tar.gz
- moved out to on_before_execute, on_after_execute. not much option here,
need both forms, the wrapping thing is just silly - fixed the listen() to not re-wrap continuously.
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py95
1 files changed, 12 insertions, 83 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 2c6caf87f..d85279981 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -305,13 +305,13 @@ class EngineEventsTest(TestBase):
break
@testing.fails_on('firebird', 'Data type unknown')
- def test_execute_events_raw(self):
+ def test_execute_events(self):
stmts = []
cursor_stmts = []
- def execute(conn, clauseelement, *multiparams,
- **params ):
+ def execute(conn, clauseelement, multiparams,
+ params ):
stmts.append((str(clauseelement), params, multiparams))
def cursor_execute(conn, cursor, statement, parameters,
@@ -324,8 +324,8 @@ class EngineEventsTest(TestBase):
engines.testing_engine(options=dict(implicit_returning=False,
strategy='threadlocal'))
]:
- event.listen(execute, 'on_execute', engine)
- event.listen(cursor_execute, 'on_cursor_execute', engine)
+ event.listen(execute, 'on_before_execute', engine)
+ event.listen(cursor_execute, 'on_before_cursor_execute', engine)
m = MetaData(engine)
t1 = Table('t1', m,
@@ -375,78 +375,7 @@ class EngineEventsTest(TestBase):
self._assert_stmts(compiled, stmts)
self._assert_stmts(cursor, cursor_stmts)
- @testing.fails_on('firebird', 'Data type unknown')
- def _broken_test_execute_events_generic(self):
-
- stmts = []
- cursor_stmts = []
-
- def listen(event_name, args):
- if event_name == 'on_execute':
- clauseelement, params, multiparams = \
- args['clauseelement'], args['params'], args['multiparams']
- stmts.append((str(clauseelement), params, multiparams))
- elif event_name == 'on_cursor_execute':
- statement, parameters = args['statement'], args['parameters']
- cursor_stmts.append((str(statement), parameters, None))
-
- for engine in [
- engines.testing_engine(options=dict(implicit_returning=False)),
- engines.testing_engine(options=dict(implicit_returning=False,
- strategy='threadlocal'))
- ]:
- event.listen(listen, 'on_execute', engine)
- event.listen(listen, 'on_cursor_execute', engine)
-
- m = MetaData(engine)
- t1 = Table('t1', m,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(50), default=func.lower('Foo'),
- primary_key=True)
- )
- m.create_all()
- try:
- t1.insert().execute(c1=5, c2='some data')
- t1.insert().execute(c1=6)
- eq_(engine.execute('select * from t1').fetchall(), [(5,
- 'some data'), (6, 'foo')])
- finally:
- m.drop_all()
- engine.dispose()
- compiled = [('CREATE TABLE t1', {}, None),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
- 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
- {'c1': 6}, None), ('select * from t1', {},
- None), ('DROP TABLE t1', {}, None)]
- if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
- # eexecute_pk_sequence
- # s:
- cursor = [
- ('CREATE TABLE t1', {}, ()),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
- : 5}, (5, 'some data')),
- ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
- (6, 'foo')),
- ('select * from t1', {}, ()),
- ('DROP TABLE t1', {}, ()),
- ]
- else:
- insert2_params = 6, 'Foo'
- if testing.against('oracle+zxjdbc'):
- insert2_params += (ReturningParam(12), )
- cursor = [('CREATE TABLE t1', {}, ()),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
- , 'c1': 5}, (5, 'some data')),
- ('INSERT INTO t1 (c1, c2)', {'c1': 6,
- 'lower_2': 'Foo'}, insert2_params),
- ('select * from t1', {}, ()), ('DROP TABLE t1'
- , {}, ())] # bind param name 'lower_2' might
- # be incorrect
- self._assert_stmts(compiled, stmts)
- self._assert_stmts(cursor, cursor_stmts)
-
- def test_options_raw(self):
+ def test_options(self):
track = []
def on_execute(conn, *args, **kw):
track.append('execute')
@@ -455,8 +384,8 @@ class EngineEventsTest(TestBase):
track.append('cursor_execute')
engine = engines.testing_engine()
- event.listen(on_execute, 'on_execute', engine)
- event.listen(on_cursor_execute, 'on_cursor_execute', engine)
+ event.listen(on_execute, 'on_before_execute', engine)
+ event.listen(on_cursor_execute, 'on_before_cursor_execute', engine)
conn = engine.connect()
c2 = conn.execution_options(foo='bar')
eq_(c2._execution_options, {'foo':'bar'})
@@ -466,7 +395,7 @@ class EngineEventsTest(TestBase):
eq_(track, ['execute', 'cursor_execute'])
- def test_transactional_raw(self):
+ def test_transactional(self):
track = []
def tracker(name):
def go(conn, *args, **kw):
@@ -474,8 +403,8 @@ class EngineEventsTest(TestBase):
return go
engine = engines.testing_engine()
- event.listen(tracker('execute'), 'on_execute', engine)
- event.listen(tracker('cursor_execute'), 'on_cursor_execute', engine)
+ event.listen(tracker('execute'), 'on_before_execute', engine)
+ event.listen(tracker('cursor_execute'), 'on_before_cursor_execute', engine)
event.listen(tracker('begin'), 'on_begin', engine)
event.listen(tracker('commit'), 'on_commit', engine)
event.listen(tracker('rollback'), 'on_rollback', engine)
@@ -495,7 +424,7 @@ class EngineEventsTest(TestBase):
@testing.requires.savepoints
@testing.requires.two_phase_transactions
- def test_transactional_advanced_raw(self):
+ def test_transactional_advanced(self):
track = []
def tracker(name):
def go(conn, exec_, *args, **kw):