diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
| commit | 45cec095b4904ba71425d2fe18c143982dd08f43 (patch) | |
| tree | af5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/sql/query.py | |
| parent | 698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff) | |
| download | sqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz | |
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run
the tests. [ticket:970]
Diffstat (limited to 'test/sql/query.py')
| -rw-r--r-- | test/sql/query.py | 1321 |
1 files changed, 0 insertions, 1321 deletions
diff --git a/test/sql/query.py b/test/sql/query.py deleted file mode 100644 index b428d8991..000000000 --- a/test/sql/query.py +++ /dev/null @@ -1,1321 +0,0 @@ -import testenv; testenv.configure_for_tests() -import datetime -from sqlalchemy import * -from sqlalchemy import exc, sql -from sqlalchemy.engine import default -from testlib import * -from testlib.testing import eq_ - -class QueryTest(TestBase): - - def setUpAll(self): - global users, users2, addresses, metadata - metadata = MetaData(testing.db) - users = Table('query_users', metadata, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20)), - ) - addresses = Table('query_addresses', metadata, - Column('address_id', Integer, primary_key=True), - Column('user_id', Integer, ForeignKey('query_users.user_id')), - Column('address', String(30))) - - users2 = Table('u2', metadata, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20)), - ) - metadata.create_all() - - def tearDown(self): - addresses.delete().execute() - users.delete().execute() - users2.delete().execute() - - def tearDownAll(self): - metadata.drop_all() - - def test_insert(self): - users.insert().execute(user_id = 7, user_name = 'jack') - assert users.count().scalar() == 1 - - def test_insert_heterogeneous_params(self): - users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9} - ) - assert users.select().execute().fetchall() == [(7, 'jack'), (8, 'ed'), (9, None)] - - def test_update(self): - users.insert().execute(user_id = 7, user_name = 'jack') - assert users.count().scalar() == 1 - - users.update(users.c.user_id == 7).execute(user_name = 'fred') - assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred' - - def test_lastrow_accessor(self): - """Tests the last_inserted_ids() and lastrow_has_id() functions.""" - - def insert_values(table, values): - """ - Inserts a row into a table, returns the full list of values - INSERTed including defaults that fired off on the DB side and - detects rows that had defaults and post-fetches. - """ - - result = table.insert().execute(**values) - ret = values.copy() - - for col, id in zip(table.primary_key, result.last_inserted_ids()): - ret[col.key] = id - - if result.lastrow_has_defaults(): - criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())]) - row = table.select(criterion).execute().fetchone() - for c in table.c: - ret[c.key] = row[c] - return ret - - for supported, table, values, assertvalues in [ - ( - {'unsupported':['sqlite']}, - Table("t1", metadata, - Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True)), - {'foo':'hi'}, - {'id':1, 'foo':'hi'} - ), - ( - {'unsupported':['sqlite']}, - Table("t2", metadata, - Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'foo':'hi'}, - {'id':1, 'foo':'hi', 'bar':'hi'} - ), - ( - {'unsupported':[]}, - Table("t3", metadata, - Column("id", String(40), primary_key=True), - Column('foo', String(30), primary_key=True), - Column("bar", String(30)) - ), - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} - ), - ( - {'unsupported':[]}, - Table("t4", metadata, - Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'foo':'hi', 'id':1}, - {'id':1, 'foo':'hi', 'bar':'hi'} - ), - ( - {'unsupported':[]}, - Table("t5", metadata, - Column('id', String(10), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'id':'id1'}, - {'id':'id1', 'bar':'hi'}, - ), - ]: - if testing.db.name in supported['unsupported']: - continue - try: - table.create() - i = insert_values(table, values) - assert i == assertvalues, repr(i) + " " + repr(assertvalues) - finally: - table.drop() - - def test_row_iteration(self): - users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, - ) - r = users.select().execute() - l = [] - for row in r: - l.append(row) - self.assert_(len(l) == 3) - - @testing.fails_on('firebird', 'Data type unknown') - @testing.requires.subqueries - def test_anonymous_rows(self): - users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, - ) - - sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar() - for row in select([sel + 1, sel + 3], bind=users.bind).execute(): - assert row['anon_1'] == 8 - assert row['anon_2'] == 10 - - def test_order_by_label(self): - """test that a label within an ORDER BY works on each backend. - - simple labels in ORDER BYs now render as the actual labelname - which not every database supports. - - """ - users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, - ) - - concat = ("test: " + users.c.user_name).label('thedata') - self.assertEquals( - select([concat]).order_by(concat).execute().fetchall(), - [("test: ed",), ("test: fred",), ("test: jack",)] - ) - - concat = ("test: " + users.c.user_name).label('thedata') - self.assertEquals( - select([concat]).order_by(desc(concat)).execute().fetchall(), - [("test: jack",), ("test: fred",), ("test: ed",)] - ) - - concat = ("test: " + users.c.user_name).label('thedata') - self.assertEquals( - select([concat]).order_by(concat + "x").execute().fetchall(), - [("test: ed",), ("test: fred",), ("test: jack",)] - ) - - - def test_row_comparison(self): - users.insert().execute(user_id = 7, user_name = 'jack') - rp = users.select().execute().fetchone() - - self.assert_(rp == rp) - self.assert_(not(rp != rp)) - - equal = (7, 'jack') - - self.assert_(rp == equal) - self.assert_(equal == rp) - self.assert_(not (rp != equal)) - self.assert_(not (equal != equal)) - - @testing.fails_on('mssql', 'No support for boolean logic in column select.') - @testing.fails_on('oracle', 'FIXME: unknown') - def test_or_and_as_columns(self): - true, false = literal(True), literal(False) - - self.assertEquals(testing.db.execute(select([and_(true, false)])).scalar(), False) - self.assertEquals(testing.db.execute(select([and_(true, true)])).scalar(), True) - self.assertEquals(testing.db.execute(select([or_(true, false)])).scalar(), True) - self.assertEquals(testing.db.execute(select([or_(false, false)])).scalar(), False) - self.assertEquals(testing.db.execute(select([not_(or_(false, false))])).scalar(), True) - - row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone() - assert row.x == False - assert row.y == False - - row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone() - assert row.x == True - assert row.y == False - - def test_fetchmany(self): - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'ed') - users.insert().execute(user_id = 9, user_name = 'fred') - r = users.select().execute() - l = [] - for row in r.fetchmany(size=2): - l.append(row) - self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l)) - - def test_like_ops(self): - users.insert().execute( - {'user_id':1, 'user_name':'apples'}, - {'user_id':2, 'user_name':'oranges'}, - {'user_id':3, 'user_name':'bananas'}, - {'user_id':4, 'user_name':'legumes'}, - {'user_id':5, 'user_name':'hi % there'}, - ) - - for expr, result in ( - (select([users.c.user_id]).where(users.c.user_name.startswith('apple')), [(1,)]), - (select([users.c.user_id]).where(users.c.user_name.contains('i % t')), [(5,)]), - (select([users.c.user_id]).where(users.c.user_name.endswith('anas')), [(3,)]), - ): - eq_(expr.execute().fetchall(), result) - - - @testing.emits_warning('.*now automatically escapes.*') - def test_percents_in_text(self): - for expr, result in ( - (text("select 6 % 10"), 6), - (text("select 17 % 10"), 7), - (text("select '%'"), '%'), - (text("select '%%'"), '%%'), - (text("select '%%%'"), '%%%'), - (text("select 'hello % world'"), "hello % world") - ): - eq_(testing.db.scalar(expr), result) - - def test_ilike(self): - users.insert().execute( - {'user_id':1, 'user_name':'one'}, - {'user_id':2, 'user_name':'TwO'}, - {'user_id':3, 'user_name':'ONE'}, - {'user_id':4, 'user_name':'OnE'}, - ) - - self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )]) - - self.assertEquals(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )]) - - if testing.against('postgres'): - self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )]) - self.assertEquals(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), []) - - - def test_compiled_execute(self): - users.insert().execute(user_id = 7, user_name = 'jack') - s = select([users], users.c.user_id==bindparam('id')).compile() - c = testing.db.connect() - assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 - - def test_compiled_insert_execute(self): - users.insert().compile().execute(user_id = 7, user_name = 'jack') - s = select([users], users.c.user_id==bindparam('id')).compile() - c = testing.db.connect() - assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 - - def test_repeated_bindparams(self): - """Tests that a BindParam can be used more than once. - - This should be run for DB-APIs with both positional and named - paramstyles. - """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - - u = bindparam('userid') - s = users.select(and_(users.c.user_name==u, users.c.user_name==u)) - r = s.execute(userid='fred').fetchall() - assert len(r) == 1 - - def test_bindparam_shortname(self): - """test the 'shortname' field on BindParamClause.""" - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - u = bindparam('userid', shortname='someshortname') - s = users.select(users.c.user_name==u) - r = s.execute(someshortname='fred').fetchall() - assert len(r) == 1 - - def test_bindparam_detection(self): - dialect = default.DefaultDialect(paramstyle='qmark') - prep = lambda q: str(sql.text(q).compile(dialect=dialect)) - - def a_eq(got, wanted): - if got != wanted: - print "Wanted %s" % wanted - print "Received %s" % got - self.assert_(got == wanted, got) - - a_eq(prep('select foo'), 'select foo') - a_eq(prep("time='12:30:00'"), "time='12:30:00'") - a_eq(prep(u"time='12:30:00'"), u"time='12:30:00'") - a_eq(prep(":this:that"), ":this:that") - a_eq(prep(":this :that"), "? ?") - a_eq(prep("(:this),(:that :other)"), "(?),(? ?)") - a_eq(prep("(:this),(:that:other)"), "(?),(:that:other)") - a_eq(prep("(:this),(:that,:other)"), "(?),(?,?)") - a_eq(prep("(:that_:other)"), "(:that_:other)") - a_eq(prep("(:that_ :other)"), "(? ?)") - a_eq(prep("(:that_other)"), "(?)") - a_eq(prep("(:that$other)"), "(?)") - a_eq(prep("(:that$:other)"), "(:that$:other)") - a_eq(prep(".:that$ :other."), ".? ?.") - - a_eq(prep(r'select \foo'), r'select \foo') - a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'") - a_eq(prep(":this \:that"), "? :that") - a_eq(prep(r"(\:that$other)"), "(:that$other)") - a_eq(prep(r".\:that$ :other."), ".:that$ ?.") - - def test_delete(self): - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - print repr(users.select().execute().fetchall()) - - users.delete(users.c.user_name == 'fred').execute() - - print repr(users.select().execute().fetchall()) - - - - @testing.exclude('mysql', '<', (5, 0, 37), 'database bug') - def test_scalar_select(self): - """test that scalar subqueries with labels get their type propagated to the result set.""" - # mysql and/or mysqldb has a bug here, type isn't propagated for scalar - # subquery. - datetable = Table('datetable', metadata, - Column('id', Integer, primary_key=True), - Column('today', DateTime)) - datetable.create() - try: - datetable.insert().execute(id=1, today=datetime.datetime(2006, 5, 12, 12, 0, 0)) - s = select([datetable.alias('x').c.today]).as_scalar() - s2 = select([datetable.c.id, s.label('somelabel')]) - #print s2.c.somelabel.type - assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime) - finally: - datetable.drop() - - def test_order_by(self): - """Exercises ORDER BY clause generation. - - Tests simple, compound, aliased and DESC clauses. - """ - - users.insert().execute(user_id=1, user_name='c') - users.insert().execute(user_id=2, user_name='b') - users.insert().execute(user_id=3, user_name='a') - - def a_eq(executable, wanted): - got = list(executable.execute()) - self.assertEquals(got, wanted) - - for labels in False, True: - a_eq(users.select(order_by=[users.c.user_id], - use_labels=labels), - [(1, 'c'), (2, 'b'), (3, 'a')]) - - a_eq(users.select(order_by=[users.c.user_name, users.c.user_id], - use_labels=labels), - [(3, 'a'), (2, 'b'), (1, 'c')]) - - a_eq(select([users.c.user_id.label('foo')], - use_labels=labels, - order_by=[users.c.user_id]), - [(1,), (2,), (3,)]) - - a_eq(select([users.c.user_id.label('foo'), users.c.user_name], - use_labels=labels, - order_by=[users.c.user_name, users.c.user_id]), - [(3, 'a'), (2, 'b'), (1, 'c')]) - - a_eq(users.select(distinct=True, - use_labels=labels, - order_by=[users.c.user_id]), - [(1, 'c'), (2, 'b'), (3, 'a')]) - - a_eq(select([users.c.user_id.label('foo')], - distinct=True, - use_labels=labels, - order_by=[users.c.user_id]), - [(1,), (2,), (3,)]) - - a_eq(select([users.c.user_id.label('a'), - users.c.user_id.label('b'), - users.c.user_name], - use_labels=labels, - order_by=[users.c.user_id]), - [(1, 1, 'c'), (2, 2, 'b'), (3, 3, 'a')]) - - a_eq(users.select(distinct=True, - use_labels=labels, - order_by=[desc(users.c.user_id)]), - [(3, 'a'), (2, 'b'), (1, 'c')]) - - a_eq(select([users.c.user_id.label('foo')], - distinct=True, - use_labels=labels, - order_by=[users.c.user_id.desc()]), - [(3,), (2,), (1,)]) - - def test_column_accessor(self): - users.insert().execute(user_id=1, user_name='john') - users.insert().execute(user_id=2, user_name='jack') - addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') - - r = users.select(users.c.user_id==2).execute().fetchone() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - # test slices - r = text("select * from query_addresses", bind=testing.db).execute().fetchone() - self.assert_(r[0:1] == (1,)) - self.assert_(r[1:] == (2, 'foo@bar.com')) - self.assert_(r[:-1] == (1, 2)) - - # test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description - r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone() - self.assert_(r['user_id']) == 1 - self.assert_(r['user_name']) == "john" - - # test using literal tablename.colname - r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone() - self.assert_(r['query_users.user_id']) == 1 - self.assert_(r['query_users.user_name']) == "john" - - def test_row_as_args(self): - users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id==1).execute().fetchone() - users.delete().execute() - users.insert().execute(r) - assert users.select().execute().fetchall() == [(1, 'john')] - - def test_result_as_args(self): - users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')]) - r = users.select().execute() - users2.insert().execute(list(r)) - assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')] - - users2.delete().execute() - r = users.select().execute() - users2.insert().execute(*list(r)) - assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')] - - def test_ambiguous_column(self): - users.insert().execute(user_id=1, user_name='john') - r = users.outerjoin(addresses).select().execute().fetchone() - try: - print r['user_id'] - assert False - except exc.InvalidRequestError, e: - assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \ - str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement." - - @testing.requires.subqueries - def test_column_label_targeting(self): - users.insert().execute(user_id=7, user_name='ed') - - for s in ( - users.select().alias('foo'), - users.select().alias(users.name), - ): - row = s.select(use_labels=True).execute().fetchone() - assert row[s.c.user_id] == 7 - assert row[s.c.user_name] == 'ed' - - def test_keys(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() - self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name']) - - def test_items(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() - self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')]) - - def test_len(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() - self.assertEqual(len(r), 2) - r.close() - r = testing.db.execute('select user_name, user_id from query_users').fetchone() - self.assertEqual(len(r), 2) - r.close() - r = testing.db.execute('select user_name from query_users').fetchone() - self.assertEqual(len(r), 1) - r.close() - - def test_cant_execute_join(self): - try: - users.join(addresses).execute() - except exc.ArgumentError, e: - assert str(e).startswith('Not an executable clause: ') - - - - def test_column_order_with_simple_query(self): - # should return values in column definition order - users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id==1).execute().fetchone() - self.assertEqual(r[0], 1) - self.assertEqual(r[1], 'foo') - self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name']) - self.assertEqual(r.values(), [1, 'foo']) - - def test_column_order_with_text_query(self): - # should return values in query order - users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users').fetchone() - self.assertEqual(r[0], 'foo') - self.assertEqual(r[1], 1) - self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id']) - self.assertEqual(r.values(), ['foo', 1]) - - @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') - @testing.crashes('firebird', 'An identifier must begin with a letter') - @testing.crashes('maxdb', 'FIXME: unknown, verify not fails_on()') - def test_column_accessor_shadow(self): - meta = MetaData(testing.db) - shadowed = Table('test_shadowed', meta, - Column('shadow_id', INT, primary_key = True), - Column('shadow_name', VARCHAR(20)), - Column('parent', VARCHAR(20)), - Column('row', VARCHAR(40)), - Column('__parent', VARCHAR(20)), - Column('__row', VARCHAR(20)), - ) - shadowed.create(checkfirst=True) - try: - shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone() - self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) - self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow') - self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') - self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow') - self.assert_(r['__parent'] == 'Hidden parent') - self.assert_(r['__row'] == 'Hidden row') - try: - print r.__parent, r.__row - self.fail('Should not allow access to private attributes') - except AttributeError: - pass # expected - r.close() - finally: - shadowed.drop(checkfirst=True) - - def test_in_filtering(self): - """test the behavior of the in_() function.""" - - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) - - s = users.select(users.c.user_name.in_([])) - r = s.execute().fetchall() - # No username is in empty set - assert len(r) == 0 - - s = users.select(not_(users.c.user_name.in_([]))) - r = s.execute().fetchall() - # All usernames with a value are outside an empty set - assert len(r) == 2 - - s = users.select(users.c.user_name.in_(['jack','fred'])) - r = s.execute().fetchall() - assert len(r) == 2 - - s = users.select(not_(users.c.user_name.in_(['jack','fred']))) - r = s.execute().fetchall() - # Null values are not outside any set - assert len(r) == 0 - - u = bindparam('search_key') - - s = users.select(u.in_([])) - r = s.execute(search_key='john').fetchall() - assert len(r) == 0 - r = s.execute(search_key=None).fetchall() - assert len(r) == 0 - - s = users.select(not_(u.in_([]))) - r = s.execute(search_key='john').fetchall() - assert len(r) == 3 - r = s.execute(search_key=None).fetchall() - assert len(r) == 0 - - @testing.fails_on('firebird', 'FIXME: unknown') - @testing.fails_on('maxdb', 'FIXME: unknown') - @testing.fails_on('oracle', 'FIXME: unknown') - @testing.fails_on('mssql', 'FIXME: unknown') - def test_in_filtering_advanced(self): - """test the behavior of the in_() function when comparing against an empty collection.""" - - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) - - s = users.select(users.c.user_name.in_([]) == True) - r = s.execute().fetchall() - assert len(r) == 0 - s = users.select(users.c.user_name.in_([]) == False) - r = s.execute().fetchall() - assert len(r) == 2 - s = users.select(users.c.user_name.in_([]) == None) - r = s.execute().fetchall() - assert len(r) == 1 - -class PercentSchemaNamesTest(TestBase): - """tests using percent signs, spaces in table and column names. - - Doesn't pass for mysql, postgres, but this is really a - SQLAlchemy bug - we should be escaping out %% signs for this - operation the same way we do for text() and column labels. - - """ - @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') - def setUpAll(self): - global percent_table, metadata - metadata = MetaData(testing.db) - percent_table = Table('percent%table', metadata, - Column("percent%", Integer), - Column("%(oneofthese)s", Integer), - Column("spaces % more spaces", Integer), - ) - metadata.create_all() - - @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') - def tearDownAll(self): - metadata.drop_all() - - @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') - def test_roundtrip(self): - percent_table.insert().execute( - {'percent%':5, '%(oneofthese)s':7, 'spaces % more spaces':12}, - ) - percent_table.insert().execute( - {'percent%':7, '%(oneofthese)s':8, 'spaces % more spaces':11}, - {'percent%':9, '%(oneofthese)s':9, 'spaces % more spaces':10}, - {'percent%':11, '%(oneofthese)s':10, 'spaces % more spaces':9}, - ) - - for table in (percent_table, percent_table.alias()): - eq_( - table.select().order_by(table.c['%(oneofthese)s']).execute().fetchall(), - [ - (5, 7, 12), - (7, 8, 11), - (9, 9, 10), - (11, 10, 9) - ] - ) - - eq_( - table.select(). - where(table.c['spaces % more spaces'].in_([9, 10])). - order_by(table.c['%(oneofthese)s']).execute().fetchall(), - [ - (9, 9, 10), - (11, 10, 9) - ] - ) - - result = table.select().order_by(table.c['%(oneofthese)s']).execute() - row = result.fetchone() - eq_(row[table.c['percent%']], 5) - eq_(row[table.c['%(oneofthese)s']], 7) - eq_(row[table.c['spaces % more spaces']], 12) - row = result.fetchone() - eq_(row['percent%'], 7) - eq_(row['%(oneofthese)s'], 8) - eq_(row['spaces % more spaces'], 11) - result.close() - - percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute() - - eq_( - percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(), - [ - (5, 9, 15), - (7, 9, 15), - (9, 9, 15), - (11, 9, 15) - ] - ) - - - -class LimitTest(TestBase): - - def setUpAll(self): - global users, addresses, metadata - metadata = MetaData(testing.db) - users = Table('query_users', metadata, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20)), - ) - addresses = Table('query_addresses', metadata, - Column('address_id', Integer, primary_key=True), - Column('user_id', Integer, ForeignKey('query_users.user_id')), - Column('address', String(30))) - metadata.create_all() - self._data() - - def _data(self): - users.insert().execute(user_id=1, user_name='john') - addresses.insert().execute(address_id=1, user_id=1, address='addr1') - users.insert().execute(user_id=2, user_name='jack') - addresses.insert().execute(address_id=2, user_id=2, address='addr1') - users.insert().execute(user_id=3, user_name='ed') - addresses.insert().execute(address_id=3, user_id=3, address='addr2') - users.insert().execute(user_id=4, user_name='wendy') - addresses.insert().execute(address_id=4, user_id=4, address='addr3') - users.insert().execute(user_id=5, user_name='laura') - addresses.insert().execute(address_id=5, user_id=5, address='addr4') - users.insert().execute(user_id=6, user_name='ralph') - addresses.insert().execute(address_id=6, user_id=6, address='addr5') - users.insert().execute(user_id=7, user_name='fido') - addresses.insert().execute(address_id=7, user_id=7, address='addr5') - - def tearDownAll(self): - metadata.drop_all() - - def test_select_limit(self): - r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r)) - - @testing.fails_on('maxdb', 'FIXME: unknown') - def test_select_limit_offset(self): - """Test the interaction between limit and offset""" - - r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')]) - r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r==[(6, 'ralph'), (7, 'fido')]) - - def test_select_distinct_limit(self): - """Test the interaction between limit and distinct""" - - r = sorted([x[0] for x in select([addresses.c.address]).distinct().limit(3).order_by(addresses.c.address).execute().fetchall()]) - self.assert_(len(r) == 3, repr(r)) - self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) - - @testing.fails_on('mssql', 'FIXME: unknown') - def test_select_distinct_offset(self): - """Test the interaction between distinct and offset""" - - r = sorted([x[0] for x in select([addresses.c.address]).distinct().offset(1).order_by(addresses.c.address).execute().fetchall()]) - self.assert_(len(r) == 4, repr(r)) - self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r)) - - def test_select_distinct_limit_offset(self): - """Test the interaction between limit and limit/offset""" - - r = select([addresses.c.address]).order_by(addresses.c.address).distinct().offset(2).limit(3).execute().fetchall() - self.assert_(len(r) == 3, repr(r)) - self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) - -class CompoundTest(TestBase): - """test compound statements like UNION, INTERSECT, particularly their ability to nest on - different databases.""" - def setUpAll(self): - global metadata, t1, t2, t3 - metadata = MetaData(testing.db) - t1 = Table('t1', metadata, - Column('col1', Integer, Sequence('t1pkseq'), primary_key=True), - Column('col2', String(30)), - Column('col3', String(40)), - Column('col4', String(30)) - ) - t2 = Table('t2', metadata, - Column('col1', Integer, Sequence('t2pkseq'), primary_key=True), - Column('col2', String(30)), - Column('col3', String(40)), - Column('col4', String(30))) - t3 = Table('t3', metadata, - Column('col1', Integer, Sequence('t3pkseq'), primary_key=True), - Column('col2', String(30)), - Column('col3', String(40)), - Column('col4', String(30))) - metadata.create_all() - - t1.insert().execute([ - dict(col2="t1col2r1", col3="aaa", col4="aaa"), - dict(col2="t1col2r2", col3="bbb", col4="bbb"), - dict(col2="t1col2r3", col3="ccc", col4="ccc"), - ]) - t2.insert().execute([ - dict(col2="t2col2r1", col3="aaa", col4="bbb"), - dict(col2="t2col2r2", col3="bbb", col4="ccc"), - dict(col2="t2col2r3", col3="ccc", col4="aaa"), - ]) - t3.insert().execute([ - dict(col2="t3col2r1", col3="aaa", col4="ccc"), - dict(col2="t3col2r2", col3="bbb", col4="aaa"), - dict(col2="t3col2r3", col3="ccc", col4="bbb"), - ]) - - def tearDownAll(self): - metadata.drop_all() - - def _fetchall_sorted(self, executed): - return sorted([tuple(row) for row in executed.fetchall()]) - - @testing.requires.subqueries - def test_union(self): - (s1, s2) = ( - select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], - t1.c.col2.in_(["t1col2r1", "t1col2r2"])), - select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], - t2.c.col2.in_(["t2col2r2", "t2col2r3"])) - ) - u = union(s1, s2) - - wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), - ('ccc', 'aaa')] - found1 = self._fetchall_sorted(u.execute()) - self.assertEquals(found1, wanted) - - found2 = self._fetchall_sorted(u.alias('bar').select().execute()) - self.assertEquals(found2, wanted) - - def test_union_ordered(self): - (s1, s2) = ( - select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], - t1.c.col2.in_(["t1col2r1", "t1col2r2"])), - select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], - t2.c.col2.in_(["t2col2r2", "t2col2r3"])) - ) - u = union(s1, s2, order_by=['col3', 'col4']) - - wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), - ('ccc', 'aaa')] - self.assertEquals(u.execute().fetchall(), wanted) - - @testing.fails_on('maxdb', 'FIXME: unknown') - @testing.requires.subqueries - def test_union_ordered_alias(self): - (s1, s2) = ( - select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], - t1.c.col2.in_(["t1col2r1", "t1col2r2"])), - select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], - t2.c.col2.in_(["t2col2r2", "t2col2r3"])) - ) - u = union(s1, s2, order_by=['col3', 'col4']) - - wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), - ('ccc', 'aaa')] - self.assertEquals(u.alias('bar').select().execute().fetchall(), wanted) - - @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('mysql', 'FIXME: unknown') - @testing.fails_on('sqlite', 'FIXME: unknown') - def test_union_all(self): - e = union_all( - select([t1.c.col3]), - union( - select([t1.c.col3]), - select([t1.c.col3]), - ) - ) - - wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] - found1 = self._fetchall_sorted(e.execute()) - self.assertEquals(found1, wanted) - - found2 = self._fetchall_sorted(e.alias('foo').select().execute()) - self.assertEquals(found2, wanted) - - @testing.crashes('firebird', 'Does not support intersect') - @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('mysql', 'FIXME: unknown') - def test_intersect(self): - i = intersect( - select([t2.c.col3, t2.c.col4]), - select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3) - ) - - wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] - - found1 = self._fetchall_sorted(i.execute()) - self.assertEquals(found1, wanted) - - found2 = self._fetchall_sorted(i.alias('bar').select().execute()) - self.assertEquals(found2, wanted) - - @testing.crashes('firebird', 'Does not support except') - @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('mysql', 'FIXME: unknown') - def test_except_style1(self): - e = except_(union( - select([t1.c.col3, t1.c.col4]), - select([t2.c.col3, t2.c.col4]), - select([t3.c.col3, t3.c.col4]), - ), select([t2.c.col3, t2.c.col4])) - - wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), - ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] - - found = self._fetchall_sorted(e.alias('bar').select().execute()) - self.assertEquals(found, wanted) - - @testing.crashes('firebird', 'Does not support except') - @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('mysql', 'FIXME: unknown') - def test_except_style2(self): - e = except_(union( - select([t1.c.col3, t1.c.col4]), - select([t2.c.col3, t2.c.col4]), - select([t3.c.col3, t3.c.col4]), - ).alias('foo').select(), select([t2.c.col3, t2.c.col4])) - - wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), - ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] - - found1 = self._fetchall_sorted(e.execute()) - self.assertEquals(found1, wanted) - - found2 = self._fetchall_sorted(e.alias('bar').select().execute()) - self.assertEquals(found2, wanted) - - @testing.crashes('firebird', 'Does not support except') - @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('mysql', 'FIXME: unknown') - @testing.fails_on('sqlite', 'FIXME: unknown') - def test_except_style3(self): - # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc - e = except_( - select([t1.c.col3]), # aaa, bbb, ccc - except_( - select([t2.c.col3]), # aaa, bbb, ccc - select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc - ) - ) - self.assertEquals(e.execute().fetchall(), [('ccc',)]) - self.assertEquals(e.alias('foo').select().execute().fetchall(), - [('ccc',)]) - - @testing.crashes('firebird', 'Does not support intersect') - @testing.fails_on('mysql', 'FIXME: unknown') - def test_composite(self): - u = intersect( - select([t2.c.col3, t2.c.col4]), - union( - select([t1.c.col3, t1.c.col4]), - select([t2.c.col3, t2.c.col4]), - select([t3.c.col3, t3.c.col4]), - ).alias('foo').select() - ) - wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] - found = self._fetchall_sorted(u.execute()) - - self.assertEquals(found, wanted) - - @testing.crashes('firebird', 'Does not support intersect') - @testing.fails_on('mysql', 'FIXME: unknown') - def test_composite_alias(self): - ua = intersect( - select([t2.c.col3, t2.c.col4]), - union( - select([t1.c.col3, t1.c.col4]), - select([t2.c.col3, t2.c.col4]), - select([t3.c.col3, t3.c.col4]), - ).alias('foo').select() - ).alias('bar') - - wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] - found = self._fetchall_sorted(ua.select().execute()) - self.assertEquals(found, wanted) - - -class JoinTest(TestBase): - """Tests join execution. - - The compiled SQL emitted by the dialect might be ANSI joins or - theta joins ('old oracle style', with (+) for OUTER). This test - tries to exercise join syntax and uncover any inconsistencies in - `JOIN rhs ON lhs.col=rhs.col` vs `rhs.col=lhs.col`. At least one - database seems to be sensitive to this. - """ - - def setUpAll(self): - global metadata - global t1, t2, t3 - - metadata = MetaData(testing.db) - t1 = Table('t1', metadata, - Column('t1_id', Integer, primary_key=True), - Column('name', String(32))) - t2 = Table('t2', metadata, - Column('t2_id', Integer, primary_key=True), - Column('t1_id', Integer, ForeignKey('t1.t1_id')), - Column('name', String(32))) - t3 = Table('t3', metadata, - Column('t3_id', Integer, primary_key=True), - Column('t2_id', Integer, ForeignKey('t2.t2_id')), - Column('name', String(32))) - metadata.drop_all() - metadata.create_all() - - # t1.10 -> t2.20 -> t3.30 - # t1.11 -> t2.21 - # t1.12 - t1.insert().execute({'t1_id': 10, 'name': 't1 #10'}, - {'t1_id': 11, 'name': 't1 #11'}, - {'t1_id': 12, 'name': 't1 #12'}) - t2.insert().execute({'t2_id': 20, 't1_id': 10, 'name': 't2 #20'}, - {'t2_id': 21, 't1_id': 11, 'name': 't2 #21'}) - t3.insert().execute({'t3_id': 30, 't2_id': 20, 'name': 't3 #30'}) - - def tearDownAll(self): - metadata.drop_all() - - def assertRows(self, statement, expected): - """Execute a statement and assert that rows returned equal expected.""" - - found = sorted([tuple(row) - for row in statement.execute().fetchall()]) - - self.assertEquals(found, sorted(expected)) - - def test_join_x1(self): - """Joins t1->t2.""" - - for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id], - from_obj=[t1.join(t2, criteria)]) - self.assertRows(expr, [(10, 20), (11, 21)]) - - def test_join_x2(self): - """Joins t1->t2->t3.""" - - for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id], - from_obj=[t1.join(t2, criteria)]) - self.assertRows(expr, [(10, 20), (11, 21)]) - - def test_outerjoin_x1(self): - """Outer joins t1->t2.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id], - from_obj=[t1.join(t2).join(t3, criteria)]) - self.assertRows(expr, [(10, 20)]) - - def test_outerjoin_x2(self): - """Outer joins t1->t2,t3.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - from_obj=[t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). \ - outerjoin(t3, criteria)]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None), (12, None, None)]) - - def test_outerjoin_where_x2_t1(self): - """Outer joins t1->t2,t3, where on t1.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t1.c.name == 't1 #10', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t1.c.t1_id < 12, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_outerjoin_where_x2_t2(self): - """Outer joins t1->t2,t3, where on t2.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t2.c.name == 't2 #20', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t2.c.t2_id < 29, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_outerjoin_where_x2_t1t2(self): - """Outer joins t1->t2,t3, where on t1 and t2.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, 29 > t2.c.t2_id), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_outerjoin_where_x2_t3(self): - """Outer joins t1->t2,t3, where on t3.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t3.c.name == 't3 #30', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t3.c.t3_id < 39, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - def test_outerjoin_where_x2_t1t3(self): - """Outer joins t1->t2,t3, where on t1 and t3.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', t3.c.name == 't3 #30'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, t3.c.t3_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - def test_outerjoin_where_x2_t1t2(self): - """Outer joins t1->t2,t3, where on t1 and t2.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 12, t2.c.t2_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_outerjoin_where_x2_t1t2t3(self): - """Outer joins t1->t2,t3, where on t1, t2 and t3.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', - t2.c.name == 't2 #20', - t3.c.name == 't3 #30'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, - t2.c.t2_id < 29, - t3.c.t3_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - def test_mixed(self): - """Joins t1->t2, outer t2->t3.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - print expr - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_mixed_where(self): - """Joins t1->t2, outer t2->t3, plus a where on each table in turn.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t1.c.name == 't1 #10', - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t2.c.name == 't2 #20', - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - t3.c.name == 't3 #30', - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t2.c.name == 't2 #20', t3.c.name == 't3 #30'), - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', - t2.c.name == 't2 #20', - t3.c.name == 't3 #30'), - from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - -class OperatorTest(TestBase): - def setUpAll(self): - global metadata, flds - metadata = MetaData(testing.db) - flds = Table('flds', metadata, - Column('idcol', Integer, Sequence('t1pkseq'), primary_key=True), - Column('intcol', Integer), - Column('strcol', String(50)), - ) - metadata.create_all() - - flds.insert().execute([ - dict(intcol=5, strcol='foo'), - dict(intcol=13, strcol='bar') - ]) - - def tearDownAll(self): - metadata.drop_all() - - @testing.fails_on('maxdb', 'FIXME: unknown') - def test_modulo(self): - self.assertEquals( - select([flds.c.intcol % 3], - order_by=flds.c.idcol).execute().fetchall(), - [(2,),(1,)] - ) - - - -if __name__ == "__main__": - testenv.main() |
