summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2006-06-05 17:25:51 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2006-06-05 17:25:51 +0000
commit120dcee5a71187d4bebfe50aedbbefb09184cac1 (patch)
treef2a090a510c8df405d0b1bef2936bafa3511be07 /test/sql
parentf8314ef9ff08af5f104731de402d6e6bd8c043f3 (diff)
downloadsqlalchemy-120dcee5a71187d4bebfe50aedbbefb09184cac1.tar.gz
reorganized unit tests into subdirectories
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/__init__.py0
-rw-r--r--test/sql/alltests.py32
-rw-r--r--test/sql/case_statement.py59
-rw-r--r--test/sql/defaults.py168
-rw-r--r--test/sql/indexes.py106
-rw-r--r--test/sql/query.py222
-rw-r--r--test/sql/select.py649
-rwxr-xr-xtest/sql/selectable.py138
-rw-r--r--test/sql/testtypes.py277
9 files changed, 1651 insertions, 0 deletions
diff --git a/test/sql/__init__.py b/test/sql/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/test/sql/__init__.py
diff --git a/test/sql/alltests.py b/test/sql/alltests.py
new file mode 100644
index 000000000..23a7a6236
--- /dev/null
+++ b/test/sql/alltests.py
@@ -0,0 +1,32 @@
+import testbase
+import unittest
+
+
+def suite():
+ modules_to_test = (
+ 'sql.testtypes',
+ 'sql.indexes',
+
+ # SQL syntax
+ 'sql.select',
+ 'sql.selectable',
+ 'sql.case_statement',
+
+ # assorted round-trip tests
+ 'sql.query',
+
+ # defaults, sequences (postgres/oracle)
+ 'sql.defaults',
+ )
+ alltests = unittest.TestSuite()
+ for name in modules_to_test:
+ mod = __import__(name)
+ for token in name.split('.')[1:]:
+ mod = getattr(mod, token)
+ alltests.addTest(unittest.findTestCases(mod, suiteClass=None))
+ return alltests
+
+
+
+if __name__ == '__main__':
+ testbase.runTests(suite())
diff --git a/test/sql/case_statement.py b/test/sql/case_statement.py
new file mode 100644
index 000000000..fc0e919a5
--- /dev/null
+++ b/test/sql/case_statement.py
@@ -0,0 +1,59 @@
+import sys
+import testbase
+from sqlalchemy import *
+
+
+class CaseTest(testbase.PersistTest):
+
+ def setUpAll(self):
+ global info_table
+ info_table = Table('infos', testbase.db,
+ Column('pk', Integer, primary_key=True),
+ Column('info', String))
+
+ info_table.create()
+
+ info_table.insert().execute(
+ {'pk':1, 'info':'pk_1_data'},
+ {'pk':2, 'info':'pk_2_data'},
+ {'pk':3, 'info':'pk_3_data'},
+ {'pk':4, 'info':'pk_4_data'},
+ {'pk':5, 'info':'pk_5_data'})
+ def tearDownAll(self):
+ info_table.drop()
+
+ def testcase(self):
+ inner = select([case([[info_table.c.pk < 3, literal('lessthan3', type=String)],
+ [info_table.c.pk >= 3, literal('gt3', type=String)]]).label('x'),
+ info_table.c.pk, info_table.c.info], from_obj=[info_table]).alias('q_inner')
+
+ inner_result = inner.execute().fetchall()
+
+ # Outputs:
+ # lessthan3 1 pk_1_data
+ # lessthan3 2 pk_2_data
+ # gt3 3 pk_3_data
+ # gt3 4 pk_4_data
+ # gt3 5 pk_5_data
+ assert inner_result == [
+ ('lessthan3', 1, 'pk_1_data'),
+ ('lessthan3', 2, 'pk_2_data'),
+ ('gt3', 3, 'pk_3_data'),
+ ('gt3', 4, 'pk_4_data'),
+ ('gt3', 5, 'pk_5_data'),
+ ]
+
+ outer = select([inner])
+
+ outer_result = outer.execute().fetchall()
+
+ assert outer_result == [
+ ('lessthan3', 1, 'pk_1_data'),
+ ('lessthan3', 2, 'pk_2_data'),
+ ('gt3', 3, 'pk_3_data'),
+ ('gt3', 4, 'pk_4_data'),
+ ('gt3', 5, 'pk_5_data'),
+ ]
+
+if __name__ == "__main__":
+ testbase.main()
diff --git a/test/sql/defaults.py b/test/sql/defaults.py
new file mode 100644
index 000000000..57b4388a1
--- /dev/null
+++ b/test/sql/defaults.py
@@ -0,0 +1,168 @@
+from testbase import PersistTest
+import sqlalchemy.util as util
+import unittest, sys, os
+import sqlalchemy.schema as schema
+import testbase
+from sqlalchemy import *
+import sqlalchemy
+
+db = testbase.db
+
+class DefaultTest(PersistTest):
+
+ def setUpAll(self):
+ global t, f, f2, ts, currenttime
+ x = {'x':50}
+ def mydefault():
+ x['x'] += 1
+ return x['x']
+
+ use_function_defaults = db.engine.name == 'postgres' or db.engine.name == 'oracle'
+ is_oracle = db.engine.name == 'oracle'
+
+ # select "count(1)" returns different results on different DBs
+ # also correct for "current_date" compatible as column default, value differences
+ currenttime = func.current_date(type=Date, engine=db);
+ if is_oracle:
+ ts = db.func.trunc(func.sysdate(), column("'DAY'")).scalar()
+ f = select([func.count(1) + 5], engine=db).scalar()
+ f2 = select([func.count(1) + 14], engine=db).scalar()
+ # TODO: engine propigation across nested functions not working
+ currenttime = func.trunc(currenttime, column("'DAY'"), engine=db)
+ def1 = currenttime
+ def2 = func.trunc(text("sysdate"), column("'DAY'"))
+ deftype = Date
+ elif use_function_defaults:
+ f = select([func.count(1) + 5], engine=db).scalar()
+ f2 = select([func.count(1) + 14], engine=db).scalar()
+ def1 = currenttime
+ def2 = text("current_date")
+ deftype = Date
+ ts = db.func.current_date().scalar()
+ else:
+ f = select([func.count(1) + 5], engine=db).scalar()
+ f2 = select([func.count(1) + 14], engine=db).scalar()
+ def1 = def2 = "3"
+ ts = 3
+ deftype = Integer
+
+ t = Table('default_test1', db,
+ # python function
+ Column('col1', Integer, primary_key=True, default=mydefault),
+
+ # python literal
+ Column('col2', String(20), default="imthedefault", onupdate="im the update"),
+
+ # preexecute expression
+ Column('col3', Integer, default=func.count(1) + 5, onupdate=func.count(1) + 14),
+
+ # SQL-side default from sql expression
+ Column('col4', deftype, PassiveDefault(def1)),
+
+ # SQL-side default from literal expression
+ Column('col5', deftype, PassiveDefault(def2)),
+
+ # preexecute + update timestamp
+ Column('col6', Date, default=currenttime, onupdate=currenttime)
+ )
+ t.create()
+
+ def tearDownAll(self):
+ t.drop()
+
+ def tearDown(self):
+ t.delete().execute()
+
+ def teststandalone(self):
+ c = db.engine.contextual_connect()
+ x = c.execute(t.c.col1.default)
+ y = t.c.col2.default.execute()
+ z = c.execute(t.c.col3.default)
+ self.assert_(50 <= x <= 57)
+ self.assert_(y == 'imthedefault')
+ self.assert_(z == f)
+ # mysql/other db's return 0 or 1 for count(1)
+ self.assert_(5 <= z <= 6)
+
+ def testinsert(self):
+ r = t.insert().execute()
+ self.assert_(r.lastrow_has_defaults())
+ t.insert().execute()
+ t.insert().execute()
+
+ ctexec = currenttime.scalar()
+ self.echo("Currenttime "+ repr(ctexec))
+ l = t.select().execute()
+ self.assert_(l.fetchall() == [(51, 'imthedefault', f, ts, ts, ctexec), (52, 'imthedefault', f, ts, ts, ctexec), (53, 'imthedefault', f, ts, ts, ctexec)])
+
+ def testinsertvalues(self):
+ t.insert(values={'col3':50}).execute()
+ l = t.select().execute()
+ self.assert_(l.fetchone()['col3'] == 50)
+
+
+ def testupdate(self):
+ r = t.insert().execute()
+ pk = r.last_inserted_ids()[0]
+ t.update(t.c.col1==pk).execute(col4=None, col5=None)
+ ctexec = currenttime.scalar()
+ self.echo("Currenttime "+ repr(ctexec))
+ l = t.select(t.c.col1==pk).execute()
+ l = l.fetchone()
+ self.assert_(l == (pk, 'im the update', f2, None, None, ctexec))
+ # mysql/other db's return 0 or 1 for count(1)
+ self.assert_(14 <= f2 <= 15)
+
+ def testupdatevalues(self):
+ r = t.insert().execute()
+ pk = r.last_inserted_ids()[0]
+ t.update(t.c.col1==pk, values={'col3': 55}).execute()
+ l = t.select(t.c.col1==pk).execute()
+ l = l.fetchone()
+ self.assert_(l['col3'] == 55)
+
+class SequenceTest(PersistTest):
+
+ def setUpAll(self):
+ if testbase.db.engine.name != 'postgres' and testbase.db.engine.name != 'oracle':
+ return
+ global cartitems
+ cartitems = Table("cartitems", db,
+ Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
+ Column("description", String(40)),
+ Column("createdate", DateTime())
+ )
+
+ cartitems.create()
+
+ @testbase.supported('postgres', 'oracle')
+ def testsequence(self):
+ cartitems.insert().execute(description='hi')
+ cartitems.insert().execute(description='there')
+ cartitems.insert().execute(description='lala')
+
+ cartitems.select().execute().fetchall()
+
+
+ @testbase.supported('postgres', 'oracle')
+ def teststandalone(self):
+ s = Sequence("my_sequence", metadata=testbase.db)
+ s.create()
+ try:
+ x = s.execute()
+ self.assert_(x == 1)
+ finally:
+ s.drop()
+
+ @testbase.supported('postgres', 'oracle')
+ def teststandalone2(self):
+ x = cartitems.c.cart_id.sequence.execute()
+ self.assert_(1 <= x <= 4)
+
+ def tearDownAll(self):
+ if testbase.db.engine.name != 'postgres' and testbase.db.engine.name != 'oracle':
+ return
+ cartitems.drop()
+
+if __name__ == "__main__":
+ testbase.main()
diff --git a/test/sql/indexes.py b/test/sql/indexes.py
new file mode 100644
index 000000000..f1e55e406
--- /dev/null
+++ b/test/sql/indexes.py
@@ -0,0 +1,106 @@
+from sqlalchemy import *
+import sys
+import testbase
+
+class IndexTest(testbase.AssertMixin):
+
+ def setUp(self):
+ global metadata
+ metadata = BoundMetaData(testbase.db)
+ self.echo = testbase.db.echo
+ self.logger = testbase.db.logger
+
+ def tearDown(self):
+ testbase.db.echo = self.echo
+ testbase.db.logger = testbase.db.engine.logger = self.logger
+ metadata.drop_all()
+
+ def test_index_create(self):
+ employees = Table('employees', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('first_name', String(30)),
+ Column('last_name', String(30)),
+ Column('email_address', String(30)))
+ employees.create()
+
+ i = Index('employee_name_index',
+ employees.c.last_name, employees.c.first_name)
+ i.create()
+ assert employees.indexes['employee_name_index'] is i
+
+ i2 = Index('employee_email_index',
+ employees.c.email_address, unique=True)
+ i2.create()
+ assert employees.indexes['employee_email_index'] is i2
+
+ def test_index_create_camelcase(self):
+ """test that mixed-case index identifiers are legal"""
+ employees = Table('companyEmployees', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('firstName', String(30)),
+ Column('lastName', String(30)),
+ Column('emailAddress', String(30)))
+
+ employees.create()
+
+ i = Index('employeeNameIndex',
+ employees.c.lastName, employees.c.firstName)
+ i.create()
+
+ i = Index('employeeEmailIndex',
+ employees.c.emailAddress, unique=True)
+ i.create()
+
+ # Check that the table is useable. This is mostly for pg,
+ # which can be somewhat sticky with mixed-case identifiers
+ employees.insert().execute(firstName='Joe', lastName='Smith', id=0)
+ ss = employees.select().execute().fetchall()
+ assert ss[0].firstName == 'Joe'
+ assert ss[0].lastName == 'Smith'
+
+ def test_index_create_inline(self):
+ """Test indexes defined with tables"""
+
+ capt = []
+ class dummy:
+ pass
+ stream = dummy()
+ stream.write = capt.append
+ testbase.db.logger = testbase.db.engine.logger = stream
+ events = Table('events', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), unique=True),
+ Column('location', String(30), index=True),
+ Column('sport', String(30),
+ unique='sport_announcer'),
+ Column('announcer', String(30),
+ unique='sport_announcer'),
+ Column('winner', String(30), index='idx_winners'))
+
+ index_names = [ ix.name for ix in events.indexes ]
+ assert 'ux_events_name' in index_names
+ assert 'ix_events_location' in index_names
+ assert 'sport_announcer' in index_names
+ assert 'idx_winners' in index_names
+ assert len(index_names) == 4
+
+ events.create()
+
+ # verify that the table is functional
+ events.insert().execute(id=1, name='hockey finals', location='rink',
+ sport='hockey', announcer='some canadian',
+ winner='sweden')
+ ss = events.select().execute().fetchall()
+
+ assert capt[0].strip().startswith('CREATE TABLE events')
+ assert capt[3].strip() == \
+ 'CREATE UNIQUE INDEX ux_events_name ON events (name)'
+ assert capt[6].strip() == \
+ 'CREATE INDEX ix_events_location ON events (location)'
+ assert capt[9].strip() == \
+ 'CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'
+ assert capt[12].strip() == \
+ 'CREATE INDEX idx_winners ON events (winner)'
+
+if __name__ == "__main__":
+ testbase.main()
diff --git a/test/sql/query.py b/test/sql/query.py
new file mode 100644
index 000000000..2148aae67
--- /dev/null
+++ b/test/sql/query.py
@@ -0,0 +1,222 @@
+from testbase import PersistTest
+import testbase
+import unittest, sys, datetime
+
+import sqlalchemy.databases.sqlite as sqllite
+
+import tables
+db = testbase.db
+from sqlalchemy import *
+from sqlalchemy.engine import ResultProxy, RowProxy
+
+class QueryTest(PersistTest):
+
+ def setUpAll(self):
+ global users
+ users = Table('query_users', db,
+ Column('user_id', INT, primary_key = True),
+ Column('user_name', VARCHAR(20)),
+ redefine = True
+ )
+ users.create()
+
+ def setUp(self):
+ self.users = users
+ def tearDown(self):
+ self.users.delete().execute()
+
+ def tearDownAll(self):
+ global users
+ users.drop()
+
+ def testinsert(self):
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ print repr(self.users.select().execute().fetchall())
+
+ def testupdate(self):
+
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ print repr(self.users.select().execute().fetchall())
+
+ self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
+ print repr(self.users.select().execute().fetchall())
+
+ def testrowiteration(self):
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ self.users.insert().execute(user_id = 8, user_name = 'ed')
+ self.users.insert().execute(user_id = 9, user_name = 'fred')
+ r = self.users.select().execute()
+ l = []
+ for row in r:
+ l.append(row)
+ self.assert_(len(l) == 3)
+
+ def test_global_metadata(self):
+ t1 = Table('table1', Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+ t2 = Table('table2', Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+
+ assert t1.c.col1
+ global_connect(testbase.db)
+ default_metadata.create_all()
+ try:
+ assert t1.count().scalar() == 0
+ finally:
+ default_metadata.drop_all()
+ default_metadata.clear()
+
+ def testpassiveoverride(self):
+ """primarily for postgres, tests that when we get a primary key column back
+ from reflecting a table which has a default value on it, we pre-execute
+ that PassiveDefault upon insert, even though PassiveDefault says
+ "let the database execute this", because in postgres we must have all the primary
+ key values in memory before insert; otherwise we cant locate the just inserted row."""
+ if db.engine.name != 'postgres':
+ return
+ try:
+ db.execute("""
+ CREATE TABLE speedy_users
+ (
+ speedy_user_id SERIAL PRIMARY KEY,
+
+ user_name VARCHAR NOT NULL,
+ user_password VARCHAR NOT NULL
+ );
+ """, None)
+
+ t = Table("speedy_users", db, autoload=True)
+ t.insert().execute(user_name='user', user_password='lala')
+ l = t.select().execute().fetchall()
+ print l
+ self.assert_(l == [(1, 'user', 'lala')])
+ finally:
+ db.execute("drop table speedy_users", None)
+
+ def testschema(self):
+ if not db.engine.__module__.endswith('postgres'):
+ return
+
+ test_table = Table('my_table', db,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(20), nullable=False),
+ schema='alt_schema'
+ )
+ test_table.create()
+ try:
+ # plain insert
+ test_table.insert().execute(data='test')
+
+ # try with a PassiveDefault
+ test_table.deregister()
+ test_table = Table('my_table', db, autoload=True, redefine=True, schema='alt_schema')
+ test_table.insert().execute(data='test')
+
+ finally:
+ test_table.drop()
+
+
+ def testdelete(self):
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ self.users.insert().execute(user_id = 8, user_name = 'fred')
+ print repr(self.users.select().execute().fetchall())
+
+ self.users.delete(self.users.c.user_name == 'fred').execute()
+
+ print repr(self.users.select().execute().fetchall())
+
+ def testselectlimit(self):
+ self.users.insert().execute(user_id=1, user_name='john')
+ self.users.insert().execute(user_id=2, user_name='jack')
+ self.users.insert().execute(user_id=3, user_name='ed')
+ self.users.insert().execute(user_id=4, user_name='wendy')
+ self.users.insert().execute(user_id=5, user_name='laura')
+ self.users.insert().execute(user_id=6, user_name='ralph')
+ self.users.insert().execute(user_id=7, user_name='fido')
+ r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
+ self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
+ r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
+ r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
+ self.assert_(r==[(6, 'ralph'), (7, 'fido')])
+
+
+ def test_column_accessor(self):
+ self.users.insert().execute(user_id=1, user_name='john')
+ self.users.insert().execute(user_id=2, user_name='jack')
+ r = self.users.select(self.users.c.user_id==2).execute().fetchone()
+ self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
+ self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+
+ def test_keys(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(r.keys(), ['user_id', 'user_name'])
+
+ def test_items(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(r.items(), [('user_id', 1), ('user_name', 'foo')])
+
+ def test_len(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(len(r), 2)
+ r.close()
+ r = db.execute('select user_name, user_id from query_users', {}).fetchone()
+ self.assertEqual(len(r), 2)
+ r.close()
+ r = db.execute('select user_name from query_users', {}).fetchone()
+ self.assertEqual(len(r), 1)
+ r.close()
+
+ def test_column_order_with_simple_query(self):
+ # should return values in column definition order
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select(self.users.c.user_id==1).execute().fetchone()
+ self.assertEqual(r[0], 1)
+ self.assertEqual(r[1], 'foo')
+ self.assertEqual(r.keys(), ['user_id', 'user_name'])
+ self.assertEqual(r.values(), [1, 'foo'])
+
+ def test_column_order_with_text_query(self):
+ # should return values in query order
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = db.execute('select user_name, user_id from query_users', {}).fetchone()
+ self.assertEqual(r[0], 'foo')
+ self.assertEqual(r[1], 1)
+ self.assertEqual(r.keys(), ['user_name', 'user_id'])
+ self.assertEqual(r.values(), ['foo', 1])
+
+ @testbase.unsupported('oracle', 'firebird')
+ def test_column_accessor_shadow(self):
+ shadowed = Table('test_shadowed', db,
+ Column('shadow_id', INT, primary_key = True),
+ Column('shadow_name', VARCHAR(20)),
+ Column('parent', VARCHAR(20)),
+ Column('row', VARCHAR(40)),
+ Column('__parent', VARCHAR(20)),
+ Column('__row', VARCHAR(20)),
+ redefine = True
+ )
+ shadowed.create()
+ try:
+ shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
+ r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
+ self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
+ self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
+ self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
+ self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+ self.assert_(r['__parent'] == 'Hidden parent')
+ self.assert_(r['__row'] == 'Hidden row')
+ try:
+ print r.__parent, r.__row
+ self.fail('Should not allow access to private attributes')
+ except AttributeError:
+ pass # expected
+ r.close()
+ finally:
+ shadowed.drop()
+
+if __name__ == "__main__":
+ testbase.main()
diff --git a/test/sql/select.py b/test/sql/select.py
new file mode 100644
index 000000000..19d39e41f
--- /dev/null
+++ b/test/sql/select.py
@@ -0,0 +1,649 @@
+
+from sqlalchemy import *
+from sqlalchemy.databases import sqlite, postgres, mysql, oracle
+from testbase import PersistTest
+import unittest, re
+import testbase
+
+# the select test now tests almost completely with TableClause/ColumnClause objects,
+# which are free-roaming table/column objects not attached to any database.
+# so SQLAlchemy's SQL construction engine can be used with no database dependencies at all.
+
+table1 = table('mytable',
+ column('myid'),
+ column('name'),
+ column('description'),
+)
+
+table2 = table(
+ 'myothertable',
+ column('otherid'),
+ column('othername'),
+)
+
+table3 = table(
+ 'thirdtable',
+ column('userid'),
+ column('otherstuff'),
+)
+
+metadata = MetaData()
+table4 = Table(
+ 'remotetable', metadata,
+ Column('rem_id', Integer, primary_key=True),
+ Column('datatype_id', Integer),
+ Column('value', String(20)),
+ schema = 'remote_owner'
+)
+
+users = table('users',
+ column('user_id'),
+ column('user_name'),
+ column('password'),
+)
+
+addresses = table('addresses',
+ column('address_id'),
+ column('user_id'),
+ column('street'),
+ column('city'),
+ column('state'),
+ column('zip')
+)
+
+class SQLTest(PersistTest):
+ def runtest(self, clause, result, dialect = None, params = None, checkparams = None):
+ c = clause.compile(parameters=params, dialect=dialect)
+ self.echo("\nSQL String:\n" + str(c) + repr(c.get_params()))
+ cc = re.sub(r'\n', '', str(c))
+ self.assert_(cc == result, str(c) + "\n does not match \n" + result)
+ if checkparams is not None:
+ if isinstance(checkparams, list):
+ self.assert_(c.get_params().values() == checkparams, "params dont match ")
+ else:
+ self.assert_(c.get_params() == checkparams, "params dont match" + repr(c.get_params()))
+
+class SelectTest(SQLTest):
+ def testtableselect(self):
+ self.runtest(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable")
+
+ self.runtest(select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \
+myothertable.othername FROM mytable, myothertable")
+
+ def testselectselect(self):
+ """tests placing select statements in the column clause of another select, for the
+ purposes of selecting from the exported columns of that select."""
+ s = select([table1], table1.c.name == 'jack')
+ self.runtest(
+ select(
+ [s],
+ s.c.myid == 7
+ )
+ ,
+ "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid")
+
+ sq = select([table1])
+ self.runtest(
+ sq.select(),
+ "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)"
+ )
+
+ sq = select(
+ [table1],
+ ).alias('sq')
+
+ self.runtest(
+ sq.select(sq.c.myid == 7),
+ "SELECT sq.myid, sq.name, sq.description FROM \
+(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid"
+ )
+
+ sq = select(
+ [table1, table2],
+ and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid),
+ use_labels = True
+ ).alias('sq')
+
+ sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
+mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
+myothertable.othername AS myothertable_othername FROM mytable, myothertable \
+WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid"
+
+ self.runtest(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \
+sq.myothertable_othername FROM (" + sqstring + ") AS sq")
+
+ sq2 = select(
+ [sq],
+ use_labels = True
+ ).alias('sq2')
+
+ self.runtest(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \
+sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \
+(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \
+sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \
+sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") AS sq) AS sq2")
+
+ def testwheresubquery(self):
+ self.runtest(
+ table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid AS otherid FROM myothertable WHERE mytable.name = myothertable.othername)"
+ )
+
+ self.runtest(
+ table1.select(exists([1], table2.c.otherid == table1.c.myid)),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
+ )
+
+ talias = table1.alias('ta')
+ s = subquery('sq2', [talias], exists([1], table2.c.otherid == talias.c.myid))
+ self.runtest(
+ select([s, table1])
+ ,"SELECT sq2.myid, sq2.name, sq2.description, mytable.myid, mytable.name, mytable.description FROM (SELECT ta.myid AS myid, ta.name AS name, ta.description AS description FROM mytable AS ta WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = ta.myid)) AS sq2, mytable")
+
+ s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
+ self.runtest(
+ select([users, s.c.street], from_obj=[s]),
+ """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
+
+ def testcolumnsubquery(self):
+ s = select([table1.c.myid], scalar=True, correlate=False)
+ self.runtest(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid AS myid FROM mytable) FROM mytable")
+
+ s = select([table1.c.myid], scalar=True)
+ self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid AS myid FROM mytable) FROM myothertable")
+
+
+ zips = table('zips',
+ column('zipcode'),
+ column('latitude'),
+ column('longitude'),
+ )
+ places = table('places',
+ column('id'),
+ column('nm')
+ )
+ zip = '12345'
+ qlat = select([zips.c.latitude], zips.c.zipcode == zip, scalar=True, correlate=False)
+ qlng = select([zips.c.longitude], zips.c.zipcode == zip, scalar=True, correlate=False)
+
+ q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
+ zips.c.zipcode==zip,
+ order_by = ['dist', places.c.nm]
+ )
+
+ self.runtest(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude AS latitude FROM zips WHERE zips.zipcode = :zips_zipco_1), (SELECT zips.longitude AS longitude FROM zips WHERE zips.zipcode = :zips_zipco_2)) AS dist FROM places, zips WHERE zips.zipcode = :zips_zipcode ORDER BY dist, places.nm")
+
+ zalias = zips.alias('main_zip')
+ qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
+ qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
+ q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
+ order_by = ['dist', places.c.nm]
+ )
+ self.runtest(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude AS latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude AS longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm")
+
+ def testand(self):
+ self.runtest(
+ select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")),
+ "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()"
+ )
+
+ def testor(self):
+ self.runtest(
+ select([table1], and_(
+ table1.c.myid == 12,
+ or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9),
+ "sysdate() = today()",
+ )),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_otherna_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()",
+ checkparams = {'myothertable_othername': 'asdf', 'myothertable_otherna_1':'foo', 'myothertable_otherid': 9, 'mytable_myid': 12}
+ )
+
+ def testoperators(self):
+ self.runtest(
+ table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name)"
+ )
+
+ self.runtest(
+ literal("a") + literal("b") * literal("c"), ":literal + (:liter_1 * :liter_2)"
+ )
+
+ def testmultiparam(self):
+ self.runtest(
+ select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')),
+ "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_my_1 OR mytable.myid = :mytable_my_2"
+ )
+
+ def testorderby(self):
+ self.runtest(
+ table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
+ "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
+ )
+ def testgroupby(self):
+ self.runtest(
+ select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]),
+ "SELECT myothertable.othername, count(myothertable.otherid) FROM myothertable GROUP BY myothertable.othername"
+ )
+
+ def testoraclelimit(self):
+ metadata = MetaData()
+ users = Table('users', metadata, Column('name', String(10), key='username'))
+ self.runtest(select([users.c.username], limit=5), "SELECT name FROM (SELECT users.name AS name, ROW_NUMBER() OVER (ORDER BY users.rowid) AS ora_rn FROM users) WHERE ora_rn<=5", dialect=oracle.dialect())
+
+ def testgroupby_and_orderby(self):
+ self.runtest(
+ select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]),
+ "SELECT myothertable.othername, count(myothertable.otherid) FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
+ )
+
+ def testforupdate(self):
+ self.runtest(
+ table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE"
+ )
+ def testalias(self):
+ # test the alias for a table1. column names stay the same, table name "changes" to "foo".
+ self.runtest(
+ select([alias(table1, 'foo')])
+ ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
+
+ # create a select for a join of two tables. use_labels means the column names will have
+ # labels tablename_columnname, which become the column keys accessible off the Selectable object.
+ # also, only use one column from the second table and all columns from the first table1.
+ q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True)
+
+ # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view".
+ a = alias(q, 't2view')
+
+ # select from that alias, also using labels. two levels of labels should produce two underscores.
+ # also, reference the column "mytable_myid" off of the t2view alias.
+ self.runtest(
+ a.select(a.c.mytable_myid == 9, use_labels = True),
+ "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \
+t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
+(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
+myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
+WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
+ )
+
+ def testtext(self):
+ self.runtest(
+ text("select * from foo where lala = bar") ,
+ "select * from foo where lala = bar"
+ )
+
+ self.runtest(select(
+ ["foobar(a)", "pk_foo_bar(syslaal)"],
+ "a = 12",
+ from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
+ ),
+ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+
+ # test building a select query programmatically with text
+ s = select()
+ s.append_column("column1")
+ s.append_column("column2")
+ s.append_whereclause("column1=12")
+ s.append_whereclause("column2=19")
+ s.order_by("column1")
+ s.append_from("table1")
+ self.runtest(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1")
+
+ def testtextbinds(self):
+ self.runtest(
+ text("select * from foo where lala=:bar and hoho=:whee"),
+ "select * from foo where lala=:bar and hoho=:whee",
+ checkparams={'bar':4, 'whee': 7},
+ params={'bar':4, 'whee': 7, 'hoho':10},
+ )
+
+ dialect = postgres.dialect()
+ self.runtest(
+ text("select * from foo where lala=:bar and hoho=:whee"),
+ "select * from foo where lala=%(bar)s and hoho=%(whee)s",
+ checkparams={'bar':4, 'whee': 7},
+ params={'bar':4, 'whee': 7, 'hoho':10},
+ dialect=dialect
+ )
+
+ dialect = sqlite.dialect()
+ self.runtest(
+ text("select * from foo where lala=:bar and hoho=:whee"),
+ "select * from foo where lala=? and hoho=?",
+ checkparams=[4, 7],
+ params={'bar':4, 'whee': 7, 'hoho':10},
+ dialect=dialect
+ )
+
+ def testtextmix(self):
+ self.runtest(select(
+ [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"],
+ and_(
+ "foo.id = foofoo(lala)",
+ "datetime(foo) = Today",
+ table1.c.myid == table2.c.otherid,
+ )
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
+FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
+
+ def testtextualsubquery(self):
+ self.runtest(select(
+ [alias(table1, 't'), "foo.f"],
+ "foo.f = t.id",
+ from_obj = ["(select f from bar where lala=heyhey) foo"]
+ ),
+ "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
+
+ def testliteral(self):
+ self.runtest(select([literal("foo") + literal("bar")], from_obj=[table1]),
+ "SELECT :literal + :liter_1 FROM mytable")
+
+ def testcalculatedcolumns(self):
+ value_tbl = table('values',
+ Column('id', Integer),
+ Column('val1', Float),
+ Column('val2', Float),
+ )
+
+ self.runtest(
+ select([value_tbl.c.id, (value_tbl.c.val2 -
+ value_tbl.c.val1)/value_tbl.c.val1]),
+ "SELECT values.id, (values.val2 - values.val1) / values.val1 FROM values"
+ )
+
+ self.runtest(
+ select([value_tbl.c.id], (value_tbl.c.val2 -
+ value_tbl.c.val1)/value_tbl.c.val1 > 2.0),
+ "SELECT values.id FROM values WHERE ((values.val2 - values.val1) / values.val1) > :literal"
+ )
+
+ self.runtest(
+ select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
+ "SELECT values.id FROM values WHERE ((values.val1 / (values.val2 - values.val1)) / values.val1) > :literal"
+ )
+
+ def testfunction(self):
+ """tests the generation of functions using the func keyword"""
+ # test an expression with a function
+ self.runtest(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
+ "lala(:lala, :la_1, :literal, mytable.myid) * myothertable.otherid")
+
+ # test it in a SELECT
+ self.runtest(select([func.count(table1.c.myid)]),
+ "SELECT count(mytable.myid) FROM mytable")
+
+ # test a "dotted" function name
+ self.runtest(select([func.foo.bar.lala(table1.c.myid)]),
+ "SELECT foo.bar.lala(mytable.myid) FROM mytable")
+
+ # test the bind parameter name with a "dotted" function name is only the name
+ # (limits the length of the bind param name)
+ self.runtest(select([func.foo.bar.lala(12)]),
+ "SELECT foo.bar.lala(:lala)")
+
+ # test a dotted func off the engine itself
+ self.runtest(func.lala.hoho(7), "lala.hoho(:hoho)")
+
+ def testjoin(self):
+ self.runtest(
+ join(table2, table1, table1.c.myid == table2.c.otherid).select(),
+ "SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, \
+mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertable.otherid"
+ )
+
+ self.runtest(
+ select(
+ [table1],
+ from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid)]
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
+
+ self.runtest(
+ select(
+ [join(join(table1, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid == table3.c.userid)
+ ]),
+ "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
+ )
+
+ self.runtest(
+ join(users, addresses, users.c.user_id==addresses.c.user_id).select(),
+ "SELECT users.user_id, users.user_name, users.password, addresses.address_id, addresses.user_id, addresses.street, addresses.city, addresses.state, addresses.zip FROM users JOIN addresses ON users.user_id = addresses.user_id"
+ )
+
+ def testmultijoin(self):
+ self.runtest(
+ select([table1, table2, table3],
+
+ from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).outerjoin(table3, table1.c.myid==table3.c.userid)]
+
+ #from_obj = [outerjoin(join(table, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid==table3.c.userid)]
+ )
+ ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid = thirdtable.userid"
+ )
+ self.runtest(
+ select([table1, table2, table3],
+ from_obj = [outerjoin(table1, join(table2, table3, table2.c.otherid == table3.c.userid), table1.c.myid==table2.c.otherid)]
+ )
+ ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid"
+ )
+
+ def testunion(self):
+ x = union(
+ select([table1], table1.c.myid == 5),
+ select([table1], table1.c.myid == 12),
+ order_by = [table1.c.myid],
+ )
+
+ self.runtest(x, "SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable WHERE mytable.myid = :mytable_myid UNION \
+SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable WHERE mytable.myid = :mytable_my_1 ORDER BY mytable.myid")
+
+ self.runtest(
+ union(
+ select([table1]),
+ select([table2]),
+ select([table3])
+ )
+ ,
+ "SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \
+FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable")
+
+ u = union(
+ select([table1]),
+ select([table2]),
+ select([table3])
+ )
+ assert u.corresponding_column(table2.c.otherid) is u.c.otherid
+
+
+ def testouterjoin(self):
+ # test an outer join. the oracle module should take the ON clause of the join and
+ # move it up to the WHERE clause of its parent select, and append (+) to all right-hand-side columns
+ # within the original onclause, but leave right-hand-side columns unchanged outside of the onclause
+ # parameters.
+
+ query = select(
+ [table1, table2],
+ and_(
+ table1.c.name == 'fred',
+ table1.c.myid == 10,
+ table2.c.othername != 'jack',
+ "EXISTS (select yay from foo where boo = lar)"
+ ),
+ from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ]
+ )
+
+ self.runtest(query,
+ "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \
+WHERE mytable.name = %(mytable_name)s AND mytable.myid = %(mytable_myid)s AND \
+myothertable.othername != %(myothertable_othername)s AND \
+EXISTS (select yay from foo where boo = lar)",
+ dialect=postgres.dialect()
+ )
+
+ self.runtest(query,
+ "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
+mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
+myothertable.othername != :myothertable_othername AND EXISTS (select yay from foo where boo = lar)",
+ dialect=oracle.OracleDialect(use_ansi = False))
+
+ query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid)
+ self.runtest(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON thirdtable.userid = myothertable.otherid")
+ self.runtest(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False))
+
+ def testbindparam(self):
+ self.runtest(select(
+ [table1, table2],
+ and_(table1.c.myid == table2.c.otherid,
+ table1.c.name == bindparam('mytablename'),
+ )
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = :mytablename"
+ )
+
+ # check that the bind params sent along with a compile() call
+ # get preserved when the params are retreived later
+ s = select([table1], table1.c.myid == bindparam('test'))
+ c = s.compile(parameters = {'test' : 7})
+ self.assert_(c.get_params() == {'test' : 7})
+
+
+ def testin(self):
+ self.runtest(select([table1], table1.c.myid.in_(1, 2, 3)),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_my_1, :mytable_my_2)")
+
+ self.runtest(select([table1], table1.c.myid.in_(select([table2.c.otherid]))),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid AS otherid FROM myothertable)")
+
+ def testlateargs(self):
+ """tests that a SELECT clause will have extra "WHERE" clauses added to it at compile time if extra arguments
+ are sent"""
+
+ self.runtest(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name = :mytable_name AND mytable.myid = :mytable_myid", params={'myid':'3', 'name':'jack'})
+
+ self.runtest(table1.select(table1.c.name=='jack'), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name", params={'myid':'3'})
+
+ self.runtest(table1.select(table1.c.name=='jack'), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name", params={'myid':'3', 'name':'fred'})
+
+ def testcast(self):
+ tbl = table('casttest',
+ Column('id', Integer),
+ Column('v1', Float),
+ Column('v2', Float),
+ Column('ts', TIMESTAMP),
+ )
+
+ def check_results(dialect, expected_results, literal):
+ self.assertEqual(len(expected_results), 5, 'Incorrect number of expected results')
+ self.assertEqual(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0])
+ self.assertEqual(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[1])
+ self.assertEqual(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2])
+ self.assertEqual(str(cast(1234, TEXT).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3]))
+ self.assertEqual(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4]))
+ sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
+ self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest")
+ # first test with Postgres engine
+ check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s')
+
+ # then the Oracle engine
+# check_results(oracle.OracleDialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':literal')
+
+ # then the sqlite engine
+ check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
+
+ # and the MySQL engine
+ check_results(mysql.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%s')
+
+class CRUDTest(SQLTest):
+ def testinsert(self):
+ # generic insert, will create bind params for all columns
+ self.runtest(insert(table1), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)")
+
+ # insert with user-supplied bind params for specific columns,
+ # cols provided literally
+ self.runtest(
+ insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}),
+ "INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
+
+ # insert with user-supplied bind params for specific columns, cols
+ # provided as strings
+ self.runtest(
+ insert(table1, dict(myid = 3, name = 'jack')),
+ "INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
+ )
+
+ # test with a tuple of params instead of named
+ self.runtest(
+ insert(table1, (3, 'jack', 'mydescription')),
+ "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)",
+ checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'}
+ )
+
+ def testinsertexpression(self):
+ self.runtest(insert(table1), "INSERT INTO mytable (myid) VALUES (lala())", params=dict(myid=func.lala()))
+
+ def testupdate(self):
+ self.runtest(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'})
+ self.runtest(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'})
+ self.runtest(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid")
+ self.runtest(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'})
+ self.runtest(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'})
+ self.runtest(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'mytable_myid': 12, 'myid': 9, 'description': 'test'})
+ s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
+ c = s.compile(parameters = {'mytable_id':9,'name':'h0h0'})
+ self.assert_(str(s) == str(c))
+
+ def testupdateexpression(self):
+ self.runtest(update(table1,
+ (table1.c.myid == func.hoho(4)) &
+ (table1.c.name == literal('foo') + table1.c.name + literal('lala')),
+ values = {
+ table1.c.name : table1.c.name + "lala",
+ table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :liter_2), name=mytable.name + :mytable_name WHERE mytable.myid = hoho(:hoho) AND mytable.name = ((:literal + mytable.name) + :liter_1)")
+
+ def testcorrelatedupdate(self):
+ # test against a straight text subquery
+ u = update(table1, values = {table1.c.name : text("select name from mytable where id=mytable.id")})
+ self.runtest(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)")
+
+ # test against a regular constructed subquery
+ s = select([table2], table2.c.otherid == table1.c.myid)
+ u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
+ self.runtest(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name")
+
+ # test a correlated WHERE clause
+ s = select([table2.c.othername], table2.c.otherid == 7)
+ u = update(table1, table1.c.name==s)
+ self.runtest(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)")
+
+ def testdelete(self):
+ self.runtest(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid")
+
+class SchemaTest(SQLTest):
+ def testselect(self):
+ # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
+ self.runtest(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
+ self.runtest(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value")
+
+ s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'))
+ s.use_labels = True
+ self.runtest(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value AS remotetable_value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value")
+
+ def testalias(self):
+ a = alias(table4, 'remtable')
+ self.runtest(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id")
+
+ def testupdate(self):
+ self.runtest(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value")
+
+ def testinsert(self):
+ self.runtest(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)")
+
+if __name__ == "__main__":
+ testbase.main()
diff --git a/test/sql/selectable.py b/test/sql/selectable.py
new file mode 100755
index 000000000..0c2aa1b56
--- /dev/null
+++ b/test/sql/selectable.py
@@ -0,0 +1,138 @@
+"""tests that various From objects properly export their columns, as well as useable primary keys
+and foreign keys. Full relational algebra depends on every selectable unit behaving
+nicely with others.."""
+
+import testbase
+import unittest, sys, datetime
+
+
+db = testbase.db
+
+from sqlalchemy import *
+
+
+table = Table('table1', db,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)),
+ Column('col3', Integer),
+ Column('colx', Integer),
+ redefine=True
+)
+
+table2 = Table('table2', db,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', Integer, ForeignKey('table1.col1')),
+ Column('col3', String(20)),
+ Column('coly', Integer),
+ redefine=True
+)
+
+class SelectableTest(testbase.AssertMixin):
+ def testtablealias(self):
+ a = table.alias('a')
+
+ j = join(a, table2)
+
+ criterion = a.c.col1 == table2.c.col2
+ print
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testunion(self):
+ # tests that we can correspond a column in a Select statement with a certain Table, against
+ # a column in a Union where one of its underlying Selects matches to that same Table
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ )
+ s1 = table.select(use_labels=True)
+ s2 = table2.select(use_labels=True)
+ print ["%d %s" % (id(c),c.key) for c in u.c]
+ c = u.corresponding_column(s1.c.table1_col2)
+ print "%d %s" % (id(c), c.key)
+ assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
+ assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+
+ def testaliasunion(self):
+ # same as testunion, except its an alias of the union
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ ).alias('analias')
+ s1 = table.select(use_labels=True)
+ s2 = table2.select(use_labels=True)
+ assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
+ assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+ assert u.corresponding_column(s2.c.table2_coly) is u.c.coly
+ assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly
+
+ def testselectunion(self):
+ # like testaliasunion, but off a Select off the union.
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ ).alias('analias')
+ s = select([u])
+ s1 = table.select(use_labels=True)
+ s2 = table2.select(use_labels=True)
+ assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
+ assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
+
+ def testunionagainstjoin(self):
+ # same as testunion, except its an alias of the union
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ ).alias('analias')
+ j1 = table.join(table2)
+ assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
+ assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
+
+ def testjoin(self):
+ a = join(table, table2)
+ print str(a.select(use_labels=True))
+ b = table2.alias('b')
+ j = join(a, b)
+ print str(j)
+ criterion = a.c.table1_col1 == b.c.col2
+ self.assert_(criterion.compare(j.onclause))
+
+ def testselectalias(self):
+ a = table.select().alias('a')
+ print str(a.select())
+ j = join(a, table2)
+
+ criterion = a.c.col1 == table2.c.col2
+ print
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testselectlabels(self):
+ a = table.select(use_labels=True)
+ print str(a.select())
+ j = join(a, table2)
+
+ criterion = a.c.table1_col1 == table2.c.col2
+ print
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testcolumnlabels(self):
+ a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])
+ print str(a)
+ print [c for c in a.columns]
+ print str(a.select())
+ j = join(a, table2)
+ criterion = a.c.acol1 == table2.c.col2
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testselectaliaslabels(self):
+ a = table2.select(use_labels=True).alias('a')
+ print str(a.select())
+ j = join(a, table)
+
+ criterion = table.c.col1 == a.c.table2_col2
+ print str(criterion)
+ print str(j.onclause)
+ self.assert_(criterion.compare(j.onclause))
+
+if __name__ == "__main__":
+ testbase.main()
+ \ No newline at end of file
diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py
new file mode 100644
index 000000000..f369bd384
--- /dev/null
+++ b/test/sql/testtypes.py
@@ -0,0 +1,277 @@
+from sqlalchemy import *
+import string,datetime, re, sys
+from testbase import PersistTest, AssertMixin
+import testbase
+import sqlalchemy.engine.url as url
+
+import sqlalchemy.types
+
+# TODO: cant get cPickle to pickle the "Foo" class from this module,
+# now that its moved
+import pickle
+sqlalchemy.types.pickle = pickle
+
+
+db = testbase.db
+
+class MyType(types.TypeEngine):
+ def get_col_spec(self):
+ return "VARCHAR(100)"
+ def convert_bind_param(self, value, engine):
+ return "BIND_IN"+ value
+ def convert_result_value(self, value, engine):
+ return value + "BIND_OUT"
+ def adapt(self, typeobj):
+ return typeobj()
+
+class MyDecoratedType(types.TypeDecorator):
+ impl = String
+ def convert_bind_param(self, value, engine):
+ return "BIND_IN"+ value
+ def convert_result_value(self, value, engine):
+ return value + "BIND_OUT"
+ def copy(self):
+ return MyDecoratedType()
+
+class MyUnicodeType(types.Unicode):
+ def convert_bind_param(self, value, engine):
+ return "UNI_BIND_IN"+ value
+ def convert_result_value(self, value, engine):
+ return value + "UNI_BIND_OUT"
+ def copy(self):
+ return MyUnicodeType(self.impl.length)
+
+class AdaptTest(PersistTest):
+ def testadapt(self):
+ e1 = url.URL('postgres').get_module().dialect()
+ e2 = url.URL('mysql').get_module().dialect()
+ e3 = url.URL('sqlite').get_module().dialect()
+
+ type = String(40)
+
+ t1 = type.dialect_impl(e1)
+ t2 = type.dialect_impl(e2)
+ t3 = type.dialect_impl(e3)
+ assert t1 != t2
+ assert t2 != t3
+ assert t3 != t1
+
+ def testdecorator(self):
+ t1 = Unicode(20)
+ t2 = Unicode()
+ assert isinstance(t1.impl, String)
+ assert not isinstance(t1.impl, TEXT)
+ assert (t1.impl.length == 20)
+ assert isinstance(t2.impl, TEXT)
+ assert t2.impl.length is None
+
+class OverrideTest(PersistTest):
+ """tests user-defined types, including a full type as well as a TypeDecorator"""
+
+ def testprocessing(self):
+
+ global users
+ users.insert().execute(user_id = 2, goofy = 'jack', goofy2='jack', goofy3='jack', goofy4='jack')
+ users.insert().execute(user_id = 3, goofy = 'lala', goofy2='lala', goofy3='lala', goofy4='lala')
+ users.insert().execute(user_id = 4, goofy = 'fred', goofy2='fred', goofy3='fred', goofy4='fred')
+
+ l = users.select().execute().fetchall()
+ print repr(l)
+ self.assert_(l == [(2, 'BIND_INjackBIND_OUT', 'BIND_INjackBIND_OUT', 'BIND_INjackBIND_OUT', u'UNI_BIND_INjackUNI_BIND_OUT'), (3, 'BIND_INlalaBIND_OUT', 'BIND_INlalaBIND_OUT', 'BIND_INlalaBIND_OUT', u'UNI_BIND_INlalaUNI_BIND_OUT'), (4, 'BIND_INfredBIND_OUT', 'BIND_INfredBIND_OUT', 'BIND_INfredBIND_OUT', u'UNI_BIND_INfredUNI_BIND_OUT')])
+
+ def setUpAll(self):
+ global users
+ users = Table('type_users', db,
+ Column('user_id', Integer, primary_key = True),
+ # totall custom type
+ Column('goofy', MyType, nullable = False),
+
+ # decorated type with an argument, so its a String
+ Column('goofy2', MyDecoratedType(50), nullable = False),
+
+ # decorated type without an argument, it will adapt_args to TEXT
+ Column('goofy3', MyDecoratedType, nullable = False),
+
+ Column('goofy4', MyUnicodeType, nullable = False),
+
+ )
+
+ users.create()
+ def tearDownAll(self):
+ global users
+ users.drop()
+
+
+class ColumnsTest(AssertMixin):
+
+ def testcolumns(self):
+ expectedResults = { 'int_column': 'int_column INTEGER',
+ 'smallint_column': 'smallint_column SMALLINT',
+ 'varchar_column': 'varchar_column VARCHAR(20)',
+ 'numeric_column': 'numeric_column NUMERIC(12, 3)',
+ 'float_column': 'float_column NUMERIC(25, 2)'
+ }
+
+ if not db.name=='sqlite':
+ expectedResults['float_column'] = 'float_column FLOAT(25)'
+
+ print db.engine.__module__
+ testTable = Table('testColumns', db,
+ Column('int_column', Integer),
+ Column('smallint_column', Smallinteger),
+ Column('varchar_column', String(20)),
+ Column('numeric_column', Numeric(12,3)),
+ Column('float_column', Float(25)),
+ )
+
+ for aCol in testTable.c:
+ self.assertEquals(expectedResults[aCol.name], db.dialect.schemagenerator(db, None).get_column_specification(aCol))
+
+class UnicodeTest(AssertMixin):
+ """tests the Unicode type. also tests the TypeDecorator with instances in the types package."""
+ def setUpAll(self):
+ global unicode_table
+ unicode_table = Table('unicode_table', db,
+ Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),
+ Column('unicode_data', Unicode(250)),
+ Column('plain_data', String(250))
+ )
+ unicode_table.create()
+ def tearDownAll(self):
+ unicode_table.drop()
+ def testbasic(self):
+ rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
+ unicodedata = rawdata.decode('utf-8')
+ unicode_table.insert().execute(unicode_data=unicodedata, plain_data=rawdata)
+ x = unicode_table.select().execute().fetchone()
+ self.echo(repr(x['unicode_data']))
+ self.echo(repr(x['plain_data']))
+ self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata)
+ if isinstance(x['plain_data'], unicode):
+ # SQLLite returns even non-unicode data as unicode
+ self.assert_(db.name == 'sqlite')
+ self.echo("its sqlite !")
+ else:
+ self.assert_(not isinstance(x['plain_data'], unicode) and x['plain_data'] == rawdata)
+ def testengineparam(self):
+ """tests engine-wide unicode conversion"""
+ prev_unicode = db.engine.dialect.convert_unicode
+ try:
+ db.engine.dialect.convert_unicode = True
+ rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
+ unicodedata = rawdata.decode('utf-8')
+ unicode_table.insert().execute(unicode_data=unicodedata, plain_data=rawdata)
+ x = unicode_table.select().execute().fetchone()
+ self.echo(repr(x['unicode_data']))
+ self.echo(repr(x['plain_data']))
+ self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata)
+ self.assert_(isinstance(x['plain_data'], unicode) and x['plain_data'] == unicodedata)
+ finally:
+ db.engine.dialect.convert_unicode = prev_unicode
+
+class Foo(object):
+ def __init__(self, moredata):
+ self.data = 'im data'
+ self.stuff = 'im stuff'
+ self.moredata = moredata
+ def __eq__(self, other):
+ return other.data == self.data and other.stuff == self.stuff and other.moredata==self.moredata
+
+class BinaryTest(AssertMixin):
+ def setUpAll(self):
+ global binary_table
+ binary_table = Table('binary_table', db,
+ Column('primary_id', Integer, primary_key=True),
+ Column('data', Binary),
+ Column('data_slice', Binary(100)),
+ Column('misc', String(30)),
+ Column('pickled', PickleType)
+ )
+ binary_table.create()
+ def tearDownAll(self):
+ binary_table.drop()
+ def testbinary(self):
+ testobj1 = Foo('im foo 1')
+ testobj2 = Foo('im foo 2')
+
+ stream1 =self.get_module_stream('sqlalchemy.sql')
+ stream2 =self.get_module_stream('sqlalchemy.schema')
+ binary_table.insert().execute(primary_id=1, misc='sql.pyc', data=stream1, data_slice=stream1[0:100], pickled=testobj1)
+ binary_table.insert().execute(primary_id=2, misc='schema.pyc', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
+ l = binary_table.select().execute().fetchall()
+ print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
+ self.assert_(list(stream1) == list(l[0]['data']))
+ self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
+ self.assert_(list(stream2) == list(l[1]['data']))
+ self.assert_(testobj1 == l[0]['pickled'])
+ self.assert_(testobj2 == l[1]['pickled'])
+
+ def get_module_stream(self, name):
+ mod = __import__(name)
+ for token in name.split('.')[1:]:
+ mod = getattr(mod, token)
+ f = mod.__file__
+ f = re.sub('\.py$', '\.pyc', f)
+ # put a number less than the typical MySQL default BLOB size
+ return file(f).read(59473)
+
+class DateTest(AssertMixin):
+ def setUpAll(self):
+ global users_with_date, insert_data
+
+ insert_data = [
+ [7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.time(12,20,2)],
+ [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.time(0,0,0)],
+ [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839), datetime.date(1970,4,1), datetime.time(23,59,59,999)],
+ [10, 'colber', None, None, None]
+ ]
+
+ fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
+
+ collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
+ Column('user_date', Date), Column('user_time', Time)]
+
+ if db.engine.name == 'mysql' or db.engine.name == 'mssql':
+ # strip microseconds -- not supported by this engine (should be an easier way to detect this)
+ for d in insert_data:
+ if d[2] is not None:
+ d[2] = d[2].replace(microsecond=0)
+ if d[4] is not None:
+ d[4] = d[4].replace(microsecond=0)
+
+ try:
+ db.type_descriptor(types.TIME).get_col_spec()
+ except:
+ # don't test TIME type -- not supported by this engine
+ insert_data = [d[:-1] for d in insert_data]
+ fnames = fnames[:-1]
+ collist = collist[:-1]
+
+ users_with_date = Table('query_users_with_date', db, redefine = True, *collist)
+ users_with_date.create()
+ insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
+
+ for idict in insert_dicts:
+ users_with_date.insert().execute(**idict) # insert the data
+
+ def tearDownAll(self):
+ users_with_date.drop()
+
+ def testdate(self):
+ global insert_data
+
+ l = map(list, users_with_date.select().execute().fetchall())
+ self.assert_(l == insert_data, 'DateTest mismatch: got:%s expected:%s' % (l, insert_data))
+
+
+ def testtextdate(self):
+ x = db.text("select user_datetime from query_users_with_date", typemap={'user_datetime':DateTime}).execute().fetchall()
+
+ print repr(x)
+ self.assert_(isinstance(x[0][0], datetime.datetime))
+
+ #x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
+ #print repr(x)
+
+if __name__ == "__main__":
+ testbase.main()