diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-08-29 11:22:46 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-08-29 11:22:46 -0400 |
| commit | 1149197a36f01dae1f8da82b7cfb73a7777e7a4a (patch) | |
| tree | 3b2cacd6c9fc75c51862eb886420036b7cd08c47 /test/engine/test_execute.py | |
| parent | a635750213c346a895e417ae8e629ce924d557e8 (diff) | |
| download | sqlalchemy-1149197a36f01dae1f8da82b7cfb73a7777e7a4a.tar.gz | |
- moved out to on_before_execute, on_after_execute. not much option here,
need both forms, the wrapping thing is just silly
- fixed the listen() to not re-wrap continuously.
Diffstat (limited to 'test/engine/test_execute.py')
| -rw-r--r-- | test/engine/test_execute.py | 95 |
1 files changed, 12 insertions, 83 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 2c6caf87f..d85279981 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -305,13 +305,13 @@ class EngineEventsTest(TestBase): break @testing.fails_on('firebird', 'Data type unknown') - def test_execute_events_raw(self): + def test_execute_events(self): stmts = [] cursor_stmts = [] - def execute(conn, clauseelement, *multiparams, - **params ): + def execute(conn, clauseelement, multiparams, + params ): stmts.append((str(clauseelement), params, multiparams)) def cursor_execute(conn, cursor, statement, parameters, @@ -324,8 +324,8 @@ class EngineEventsTest(TestBase): engines.testing_engine(options=dict(implicit_returning=False, strategy='threadlocal')) ]: - event.listen(execute, 'on_execute', engine) - event.listen(cursor_execute, 'on_cursor_execute', engine) + event.listen(execute, 'on_before_execute', engine) + event.listen(cursor_execute, 'on_before_cursor_execute', engine) m = MetaData(engine) t1 = Table('t1', m, @@ -375,78 +375,7 @@ class EngineEventsTest(TestBase): self._assert_stmts(compiled, stmts) self._assert_stmts(cursor, cursor_stmts) - @testing.fails_on('firebird', 'Data type unknown') - def _broken_test_execute_events_generic(self): - - stmts = [] - cursor_stmts = [] - - def listen(event_name, args): - if event_name == 'on_execute': - clauseelement, params, multiparams = \ - args['clauseelement'], args['params'], args['multiparams'] - stmts.append((str(clauseelement), params, multiparams)) - elif event_name == 'on_cursor_execute': - statement, parameters = args['statement'], args['parameters'] - cursor_stmts.append((str(statement), parameters, None)) - - for engine in [ - engines.testing_engine(options=dict(implicit_returning=False)), - engines.testing_engine(options=dict(implicit_returning=False, - strategy='threadlocal')) - ]: - event.listen(listen, 'on_execute', engine) - event.listen(listen, 'on_cursor_execute', engine) - - m = MetaData(engine) - t1 = Table('t1', m, - Column('c1', Integer, primary_key=True), - Column('c2', String(50), default=func.lower('Foo'), - primary_key=True) - ) - m.create_all() - try: - t1.insert().execute(c1=5, c2='some data') - t1.insert().execute(c1=6) - eq_(engine.execute('select * from t1').fetchall(), [(5, - 'some data'), (6, 'foo')]) - finally: - m.drop_all() - engine.dispose() - compiled = [('CREATE TABLE t1', {}, None), - ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', - 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)', - {'c1': 6}, None), ('select * from t1', {}, - None), ('DROP TABLE t1', {}, None)] - if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr - # eexecute_pk_sequence - # s: - cursor = [ - ('CREATE TABLE t1', {}, ()), - ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1' - : 5}, (5, 'some data')), - ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )), - ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6}, - (6, 'foo')), - ('select * from t1', {}, ()), - ('DROP TABLE t1', {}, ()), - ] - else: - insert2_params = 6, 'Foo' - if testing.against('oracle+zxjdbc'): - insert2_params += (ReturningParam(12), ) - cursor = [('CREATE TABLE t1', {}, ()), - ('INSERT INTO t1 (c1, c2)', {'c2': 'some data' - , 'c1': 5}, (5, 'some data')), - ('INSERT INTO t1 (c1, c2)', {'c1': 6, - 'lower_2': 'Foo'}, insert2_params), - ('select * from t1', {}, ()), ('DROP TABLE t1' - , {}, ())] # bind param name 'lower_2' might - # be incorrect - self._assert_stmts(compiled, stmts) - self._assert_stmts(cursor, cursor_stmts) - - def test_options_raw(self): + def test_options(self): track = [] def on_execute(conn, *args, **kw): track.append('execute') @@ -455,8 +384,8 @@ class EngineEventsTest(TestBase): track.append('cursor_execute') engine = engines.testing_engine() - event.listen(on_execute, 'on_execute', engine) - event.listen(on_cursor_execute, 'on_cursor_execute', engine) + event.listen(on_execute, 'on_before_execute', engine) + event.listen(on_cursor_execute, 'on_before_cursor_execute', engine) conn = engine.connect() c2 = conn.execution_options(foo='bar') eq_(c2._execution_options, {'foo':'bar'}) @@ -466,7 +395,7 @@ class EngineEventsTest(TestBase): eq_(track, ['execute', 'cursor_execute']) - def test_transactional_raw(self): + def test_transactional(self): track = [] def tracker(name): def go(conn, *args, **kw): @@ -474,8 +403,8 @@ class EngineEventsTest(TestBase): return go engine = engines.testing_engine() - event.listen(tracker('execute'), 'on_execute', engine) - event.listen(tracker('cursor_execute'), 'on_cursor_execute', engine) + event.listen(tracker('execute'), 'on_before_execute', engine) + event.listen(tracker('cursor_execute'), 'on_before_cursor_execute', engine) event.listen(tracker('begin'), 'on_begin', engine) event.listen(tracker('commit'), 'on_commit', engine) event.listen(tracker('rollback'), 'on_rollback', engine) @@ -495,7 +424,7 @@ class EngineEventsTest(TestBase): @testing.requires.savepoints @testing.requires.two_phase_transactions - def test_transactional_advanced_raw(self): + def test_transactional_advanced(self): track = [] def tracker(name): def go(conn, exec_, *args, **kw): |
