diff options
Diffstat (limited to 'test/engine/test_execute.py')
| -rw-r--r-- | test/engine/test_execute.py | 332 |
1 files changed, 0 insertions, 332 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 66903bef3..76d60f207 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -874,184 +874,6 @@ class MockStrategyTest(fixtures.TestBase): ) -class ResultProxyTest(fixtures.TestBase): - __backend__ = True - - def test_nontuple_row(self): - """ensure the C version of BaseRowProxy handles - duck-type-dependent rows.""" - - from sqlalchemy.engine import RowProxy - - class MyList(object): - - def __init__(self, l): - self.l = l - - def __len__(self): - return len(self.l) - - def __getitem__(self, i): - return list.__getitem__(self.l, i) - - proxy = RowProxy(object(), MyList(['value']), [None], { - 'key': (None, None, 0), 0: (None, None, 0)}) - eq_(list(proxy), ['value']) - eq_(proxy[0], 'value') - eq_(proxy['key'], 'value') - - @testing.provide_metadata - def test_no_rowcount_on_selects_inserts(self): - """assert that rowcount is only called on deletes and updates. - - This because cursor.rowcount may can be expensive on some dialects - such as Firebird, however many dialects require it be called - before the cursor is closed. - - """ - - metadata = self.metadata - - engine = engines.testing_engine() - - t = Table('t1', metadata, - Column('data', String(10)) - ) - metadata.create_all(engine) - - with patch.object( - engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: - mock_rowcount.__get__ = Mock() - engine.execute(t.insert(), - {'data': 'd1'}, - {'data': 'd2'}, - {'data': 'd3'}) - - eq_(len(mock_rowcount.__get__.mock_calls), 0) - - eq_( - engine.execute(t.select()).fetchall(), - [('d1', ), ('d2', ), ('d3', )] - ) - eq_(len(mock_rowcount.__get__.mock_calls), 0) - - engine.execute(t.update(), {'data': 'd4'}) - - eq_(len(mock_rowcount.__get__.mock_calls), 1) - - engine.execute(t.delete()) - eq_(len(mock_rowcount.__get__.mock_calls), 2) - - def test_rowproxy_is_sequence(self): - import collections - from sqlalchemy.engine import RowProxy - - row = RowProxy( - object(), ['value'], [None], - {'key': (None, None, 0), 0: (None, None, 0)}) - assert isinstance(row, collections.Sequence) - - @testing.provide_metadata - def test_rowproxy_getitem_indexes_compiled(self): - values = Table('users', self.metadata, - Column('key', String(10), primary_key=True), - Column('value', String(10))) - values.create() - - testing.db.execute(values.insert(), dict(key='One', value='Uno')) - row = testing.db.execute(values.select()).first() - eq_(row['key'], 'One') - eq_(row['value'], 'Uno') - eq_(row[0], 'One') - eq_(row[1], 'Uno') - eq_(row[-2], 'One') - eq_(row[-1], 'Uno') - eq_(row[1:0:-1], ('Uno',)) - - def test_rowproxy_getitem_indexes_raw(self): - row = testing.db.execute("select 'One' as key, 'Uno' as value").first() - eq_(row['key'], 'One') - eq_(row['value'], 'Uno') - eq_(row[0], 'One') - eq_(row[1], 'Uno') - eq_(row[-2], 'One') - eq_(row[-1], 'Uno') - eq_(row[1:0:-1], ('Uno',)) - - @testing.requires.cextensions - def test_row_c_sequence_check(self): - import csv - - metadata = MetaData() - metadata.bind = 'sqlite://' - users = Table('users', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(40)), - ) - users.create() - - users.insert().execute(name='Test') - row = users.select().execute().fetchone() - - s = util.StringIO() - writer = csv.writer(s) - # csv performs PySequenceCheck call - writer.writerow(row) - assert s.getvalue().strip() == '1,Test' - - @testing.requires.selectone - def test_empty_accessors(self): - statements = [ - ( - "select 1", - [ - lambda r: r.last_inserted_params(), - lambda r: r.last_updated_params(), - lambda r: r.prefetch_cols(), - lambda r: r.postfetch_cols(), - lambda r: r.inserted_primary_key - ], - "Statement is not a compiled expression construct." - ), - ( - select([1]), - [ - lambda r: r.last_inserted_params(), - lambda r: r.inserted_primary_key - ], - r"Statement is not an insert\(\) expression construct." - ), - ( - select([1]), - [ - lambda r: r.last_updated_params(), - ], - r"Statement is not an update\(\) expression construct." - ), - ( - select([1]), - [ - lambda r: r.prefetch_cols(), - lambda r: r.postfetch_cols() - ], - r"Statement is not an insert\(\) " - r"or update\(\) expression construct." - ), - ] - - for stmt, meths, msg in statements: - r = testing.db.execute(stmt) - try: - for meth in meths: - assert_raises_message( - tsa.exc.InvalidRequestError, - msg, - meth, r - ) - - finally: - r.close() - class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): __requires__ = 'schemas', __backend__ = True @@ -1260,160 +1082,6 @@ class ExecutionOptionsTest(fixtures.TestBase): ) -class AlternateResultProxyTest(fixtures.TablesTest): - __requires__ = ('sqlite', ) - - @classmethod - def setup_bind(cls): - cls.engine = engine = testing_engine('sqlite://') - return engine - - @classmethod - def define_tables(cls, metadata): - Table( - 'test', metadata, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) - ) - - @classmethod - def insert_data(cls): - cls.engine.execute(cls.tables.test.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(1, 12) - ]) - - @contextmanager - def _proxy_fixture(self, cls): - self.table = self.tables.test - - class ExcCtx(default.DefaultExecutionContext): - - def get_result_proxy(self): - return cls(self) - self.patcher = patch.object( - self.engine.dialect, "execution_ctx_cls", ExcCtx) - with self.patcher: - yield - - def _test_proxy(self, cls): - with self._proxy_fixture(cls): - rows = [] - r = self.engine.execute(select([self.table])) - assert isinstance(r, cls) - for i in range(5): - rows.append(r.fetchone()) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - - rows = r.fetchmany(3) - eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) - - rows = r.fetchall() - eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) - - r = self.engine.execute(select([self.table])) - rows = r.fetchmany(None) - eq_(rows[0], (1, "t_1")) - # number of rows here could be one, or the whole thing - assert len(rows) == 1 or len(rows) == 11 - - r = self.engine.execute(select([self.table]).limit(1)) - r.fetchone() - eq_(r.fetchone(), None) - - r = self.engine.execute(select([self.table]).limit(5)) - rows = r.fetchmany(6) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - - # result keeps going just fine with blank results... - eq_(r.fetchmany(2), []) - - eq_(r.fetchmany(2), []) - - eq_(r.fetchall(), []) - - eq_(r.fetchone(), None) - - # until we close - r.close() - - self._assert_result_closed(r) - - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.first(), (1, "t_1")) - self._assert_result_closed(r) - - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.scalar(), 1) - self._assert_result_closed(r) - - def _assert_result_closed(self, r): - assert_raises_message( - tsa.exc.ResourceClosedError, - "object is closed", - r.fetchone - ) - - assert_raises_message( - tsa.exc.ResourceClosedError, - "object is closed", - r.fetchmany, 2 - ) - - assert_raises_message( - tsa.exc.ResourceClosedError, - "object is closed", - r.fetchall - ) - - def test_plain(self): - self._test_proxy(_result.ResultProxy) - - def test_buffered_row_result_proxy(self): - self._test_proxy(_result.BufferedRowResultProxy) - - def test_fully_buffered_result_proxy(self): - self._test_proxy(_result.FullyBufferedResultProxy) - - def test_buffered_column_result_proxy(self): - self._test_proxy(_result.BufferedColumnResultProxy) - - def test_buffered_row_growth(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): - with self.engine.connect() as conn: - conn.execute(self.table.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) - ]) - result = conn.execute(self.table.select()) - checks = { - 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, - 1351: 1000 - } - for idx, row in enumerate(result, 0): - if idx in checks: - eq_(result._bufsize, checks[idx]) - le_( - len(result._BufferedRowResultProxy__rowbuffer), - 1000 - ) - - def test_max_row_buffer_option(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): - with self.engine.connect() as conn: - conn.execute(self.table.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) - ]) - result = conn.execution_options(max_row_buffer=27).execute( - self.table.select() - ) - for idx, row in enumerate(result, 0): - if idx in (16, 70, 150, 250): - eq_(result._bufsize, 27) - le_( - len(result._BufferedRowResultProxy__rowbuffer), - 27 - ) - - class EngineEventsTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', __backend__ = True |
