diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
| commit | 45cec095b4904ba71425d2fe18c143982dd08f43 (patch) | |
| tree | af5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/sql/defaults.py | |
| parent | 698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff) | |
| download | sqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz | |
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run
the tests. [ticket:970]
Diffstat (limited to 'test/sql/defaults.py')
| -rw-r--r-- | test/sql/defaults.py | 634 |
1 files changed, 0 insertions, 634 deletions
diff --git a/test/sql/defaults.py b/test/sql/defaults.py deleted file mode 100644 index bea6dc04b..000000000 --- a/test/sql/defaults.py +++ /dev/null @@ -1,634 +0,0 @@ -import testenv; testenv.configure_for_tests() -import datetime -from sqlalchemy import Sequence, Column, func -from sqlalchemy.sql import select, text -from testlib import sa, testing -from testlib.sa import MetaData, Table, Integer, String, ForeignKey, Boolean -from testlib.testing import eq_ -from sql import _base - - -class DefaultTest(testing.TestBase): - - def setUpAll(self): - global t, f, f2, ts, currenttime, metadata, default_generator - - db = testing.db - metadata = MetaData(db) - default_generator = {'x':50} - - def mydefault(): - default_generator['x'] += 1 - return default_generator['x'] - - def myupdate_with_ctx(ctx): - conn = ctx.connection - return conn.execute(sa.select([sa.text('13')])).scalar() - - def mydefault_using_connection(ctx): - conn = ctx.connection - try: - return conn.execute(sa.select([sa.text('12')])).scalar() - finally: - # ensure a "close()" on this connection does nothing, - # since its a "branched" connection - conn.close() - - use_function_defaults = testing.against('postgres', 'mssql', 'maxdb') - is_oracle = testing.against('oracle') - - # select "count(1)" returns different results on different DBs also - # correct for "current_date" compatible as column default, value - # differences - currenttime = func.current_date(type_=sa.Date, bind=db) - if is_oracle: - ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')])) - assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime) - f = sa.select([func.length('abcdef')], bind=db).scalar() - f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() - # TODO: engine propigation across nested functions not working - currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date) - def1 = currenttime - def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date) - - deftype = sa.Date - elif use_function_defaults: - f = sa.select([func.length('abcdef')], bind=db).scalar() - f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() - def1 = currenttime - deftype = sa.Date - if testing.against('maxdb'): - def2 = sa.text("curdate") - elif testing.against('mssql'): - def2 = sa.text("getdate()") - else: - def2 = sa.text("current_date") - ts = db.func.current_date().scalar() - else: - f = len('abcdef') - f2 = len('abcdefghijk') - def1 = def2 = "3" - ts = 3 - deftype = Integer - - t = Table('default_test1', metadata, - # python function - Column('col1', Integer, primary_key=True, - default=mydefault), - - # python literal - Column('col2', String(20), - default="imthedefault", - onupdate="im the update"), - - # preexecute expression - Column('col3', Integer, - default=func.length('abcdef'), - onupdate=func.length('abcdefghijk')), - - # SQL-side default from sql expression - Column('col4', deftype, - server_default=def1), - - # SQL-side default from literal expression - Column('col5', deftype, - server_default=def2), - - # preexecute + update timestamp - Column('col6', sa.Date, - default=currenttime, - onupdate=currenttime), - - Column('boolcol1', sa.Boolean, default=True), - Column('boolcol2', sa.Boolean, default=False), - - # python function which uses ExecutionContext - Column('col7', Integer, - default=mydefault_using_connection, - onupdate=myupdate_with_ctx), - - # python builtin - Column('col8', sa.Date, - default=datetime.date.today, - onupdate=datetime.date.today), - # combo - Column('col9', String(20), - default='py', - server_default='ddl')) - t.create() - - def tearDownAll(self): - t.drop() - - def tearDown(self): - default_generator['x'] = 50 - t.delete().execute() - - def test_bad_arg_signature(self): - ex_msg = \ - "ColumnDefault Python function takes zero or one positional arguments" - - def fn1(x, y): pass - def fn2(x, y, z=3): pass - class fn3(object): - def __init__(self, x, y): - pass - class FN4(object): - def __call__(self, x, y): - pass - fn4 = FN4() - - for fn in fn1, fn2, fn3, fn4: - self.assertRaisesMessage(sa.exc.ArgumentError, - ex_msg, - sa.ColumnDefault, fn) - - def test_arg_signature(self): - def fn1(): pass - def fn2(): pass - def fn3(x=1): pass - def fn4(x=1, y=2, z=3): pass - fn5 = list - class fn6(object): - def __init__(self, x): - pass - class fn6(object): - def __init__(self, x, y=3): - pass - class FN7(object): - def __call__(self, x): - pass - fn7 = FN7() - class FN8(object): - def __call__(self, x, y=3): - pass - fn8 = FN8() - - for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8: - c = sa.ColumnDefault(fn) - - @testing.fails_on('firebird', 'Data type unknown') - def test_standalone(self): - c = testing.db.engine.contextual_connect() - x = c.execute(t.c.col1.default) - y = t.c.col2.default.execute() - z = c.execute(t.c.col3.default) - assert 50 <= x <= 57 - eq_(y, 'imthedefault') - eq_(z, f) - eq_(f2, 11) - - def test_py_vs_server_default_detection(self): - - def has_(name, *wanted): - slots = ['default', 'onupdate', 'server_default', 'server_onupdate'] - col = tbl.c[name] - for slot in wanted: - slots.remove(slot) - assert getattr(col, slot) is not None, getattr(col, slot) - for slot in slots: - assert getattr(col, slot) is None, getattr(col, slot) - - tbl = t - has_('col1', 'default') - has_('col2', 'default', 'onupdate') - has_('col3', 'default', 'onupdate') - has_('col4', 'server_default') - has_('col5', 'server_default') - has_('col6', 'default', 'onupdate') - has_('boolcol1', 'default') - has_('boolcol2', 'default') - has_('col7', 'default', 'onupdate') - has_('col8', 'default', 'onupdate') - has_('col9', 'default', 'server_default') - - ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause - - t2 = Table('t2', MetaData(), - Column('col1', Integer, Sequence('foo')), - Column('col2', Integer, - default=Sequence('foo'), - server_default='y'), - Column('col3', Integer, - Sequence('foo'), - server_default='x'), - Column('col4', Integer, - ColumnDefault('x'), - DefaultClause('y')), - Column('col4', Integer, - ColumnDefault('x'), - DefaultClause('y'), - DefaultClause('y', for_update=True)), - Column('col5', Integer, - ColumnDefault('x'), - DefaultClause('y'), - onupdate='z'), - Column('col6', Integer, - ColumnDefault('x'), - server_default='y', - onupdate='z'), - Column('col7', Integer, - default='x', - server_default='y', - onupdate='z'), - Column('col8', Integer, - server_onupdate='u', - default='x', - server_default='y', - onupdate='z')) - tbl = t2 - has_('col1', 'default') - has_('col2', 'default', 'server_default') - has_('col3', 'default', 'server_default') - has_('col4', 'default', 'server_default', 'server_onupdate') - has_('col5', 'default', 'server_default', 'onupdate') - has_('col6', 'default', 'server_default', 'onupdate') - has_('col7', 'default', 'server_default', 'onupdate') - has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate') - - @testing.fails_on('firebird', 'Data type unknown') - def test_insert(self): - r = t.insert().execute() - assert r.lastrow_has_defaults() - eq_(set(r.context.postfetch_cols), - set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) - - r = t.insert(inline=True).execute() - assert r.lastrow_has_defaults() - eq_(set(r.context.postfetch_cols), - set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) - - t.insert().execute() - - ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar() - l = t.select().order_by(t.c.col1).execute() - today = datetime.date.today() - eq_(l.fetchall(), [ - (x, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py') - for x in range(51, 54)]) - - t.insert().execute(col9=None) - assert r.lastrow_has_defaults() - eq_(set(r.context.postfetch_cols), - set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) - - eq_(t.select(t.c.col1==54).execute().fetchall(), - [(54, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, None)]) - - @testing.fails_on('firebird', 'Data type unknown') - def test_insertmany(self): - # MySQL-Python 1.2.2 breaks functions in execute_many :( - if (testing.against('mysql') and - testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): - return - - r = t.insert().execute({}, {}, {}) - - ctexec = currenttime.scalar() - l = t.select().execute() - today = datetime.date.today() - eq_(l.fetchall(), - [(51, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py'), - (52, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py'), - (53, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py')]) - - def test_insert_values(self): - t.insert(values={'col3':50}).execute() - l = t.select().execute() - eq_(50, l.fetchone()['col3']) - - @testing.fails_on('firebird', 'Data type unknown') - def test_updatemany(self): - # MySQL-Python 1.2.2 breaks functions in execute_many :( - if (testing.against('mysql') and - testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): - return - - t.insert().execute({}, {}, {}) - - t.update(t.c.col1==sa.bindparam('pkval')).execute( - {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False}) - - t.update(t.c.col1==sa.bindparam('pkval')).execute( - {'pkval':51,}, - {'pkval':52,}, - {'pkval':53,}) - - l = t.select().execute() - ctexec = currenttime.scalar() - today = datetime.date.today() - eq_(l.fetchall(), - [(51, 'im the update', f2, ts, ts, ctexec, False, False, - 13, today, 'py'), - (52, 'im the update', f2, ts, ts, ctexec, True, False, - 13, today, 'py'), - (53, 'im the update', f2, ts, ts, ctexec, True, False, - 13, today, 'py')]) - - @testing.fails_on('firebird', 'Data type unknown') - def test_update(self): - r = t.insert().execute() - pk = r.last_inserted_ids()[0] - t.update(t.c.col1==pk).execute(col4=None, col5=None) - ctexec = currenttime.scalar() - l = t.select(t.c.col1==pk).execute() - l = l.fetchone() - eq_(l, - (pk, 'im the update', f2, None, None, ctexec, True, False, - 13, datetime.date.today(), 'py')) - eq_(11, f2) - - @testing.fails_on('firebird', 'Data type unknown') - def test_update_values(self): - r = t.insert().execute() - pk = r.last_inserted_ids()[0] - t.update(t.c.col1==pk, values={'col3': 55}).execute() - l = t.select(t.c.col1==pk).execute() - l = l.fetchone() - eq_(55, l['col3']) - - @testing.fails_on_everything_except('postgres') - def test_passive_override(self): - """ - Primarily for postgres, tests that when we get a primary key column - back from reflecting a table which has a default value on it, we - pre-execute that DefaultClause upon insert, even though DefaultClause - says "let the database execute this", because in postgres we must have - all the primary key values in memory before insert; otherwise we can't - locate the just inserted row. - - """ - # TODO: move this to dialect/postgres - try: - meta = MetaData(testing.db) - testing.db.execute(""" - CREATE TABLE speedy_users - ( - speedy_user_id SERIAL PRIMARY KEY, - - user_name VARCHAR NOT NULL, - user_password VARCHAR NOT NULL - ); - """, None) - - t = Table("speedy_users", meta, autoload=True) - t.insert().execute(user_name='user', user_password='lala') - l = t.select().execute().fetchall() - eq_(l, [(1, 'user', 'lala')]) - finally: - testing.db.execute("drop table speedy_users", None) - - -class PKDefaultTest(_base.TablesTest): - __requires__ = ('subqueries',) - - def define_tables(self, metadata): - t2 = Table('t2', metadata, - Column('nextid', Integer)) - - Table('t1', metadata, - Column('id', Integer, primary_key=True, - default=sa.select([func.max(t2.c.nextid)]).as_scalar()), - Column('data', String(30))) - - @testing.fails_on('mssql', 'FIXME: unknown') - @testing.resolve_artifact_names - def test_basic(self): - t2.insert().execute(nextid=1) - r = t1.insert().execute(data='hi') - eq_([1], r.last_inserted_ids()) - - t2.insert().execute(nextid=2) - r = t1.insert().execute(data='there') - eq_([2], r.last_inserted_ids()) - - -class PKIncrementTest(_base.TablesTest): - run_define_tables = 'each' - - def define_tables(self, metadata): - Table("aitable", metadata, - Column('id', Integer, Sequence('ai_id_seq', optional=True), - primary_key=True), - Column('int1', Integer), - Column('str1', String(20))) - - # TODO: add coverage for increment on a secondary column in a key - @testing.fails_on('firebird', 'Data type unknown') - @testing.resolve_artifact_names - def _test_autoincrement(self, bind): - ids = set() - rs = bind.execute(aitable.insert(), int1=1) - last = rs.last_inserted_ids()[0] - self.assert_(last) - self.assert_(last not in ids) - ids.add(last) - - rs = bind.execute(aitable.insert(), str1='row 2') - last = rs.last_inserted_ids()[0] - self.assert_(last) - self.assert_(last not in ids) - ids.add(last) - - rs = bind.execute(aitable.insert(), int1=3, str1='row 3') - last = rs.last_inserted_ids()[0] - self.assert_(last) - self.assert_(last not in ids) - ids.add(last) - - rs = bind.execute(aitable.insert(values={'int1':func.length('four')})) - last = rs.last_inserted_ids()[0] - self.assert_(last) - self.assert_(last not in ids) - ids.add(last) - - eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))), - [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)]) - - @testing.resolve_artifact_names - def test_autoincrement_autocommit(self): - self._test_autoincrement(testing.db) - - @testing.resolve_artifact_names - def test_autoincrement_transaction(self): - con = testing.db.connect() - tx = con.begin() - try: - try: - self._test_autoincrement(con) - except: - try: - tx.rollback() - except: - pass - raise - else: - tx.commit() - finally: - con.close() - - -class EmptyInsertTest(testing.TestBase): - @testing.exclude('sqlite', '<', (3, 3, 8), 'no empty insert support') - @testing.fails_on('oracle', 'FIXME: unknown') - def test_empty_insert(self): - metadata = MetaData(testing.db) - t1 = Table('t1', metadata, - Column('is_true', Boolean, server_default=('1'))) - metadata.create_all() - - try: - result = t1.insert().execute() - self.assertEquals(1, select([func.count(text('*'))], from_obj=t1).scalar()) - self.assertEquals(True, t1.select().scalar()) - finally: - metadata.drop_all() - -class AutoIncrementTest(_base.TablesTest): - __requires__ = ('identity',) - run_define_tables = 'each' - - def define_tables(self, metadata): - """Each test manipulates self.metadata individually.""" - - @testing.exclude('sqlite', '<', (3, 4), 'no database support') - def test_autoincrement_single_col(self): - single = Table('single', self.metadata, - Column('id', Integer, primary_key=True)) - single.create() - - r = single.insert().execute() - id_ = r.last_inserted_ids()[0] - assert id_ is not None - eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar()) - - def test_autoincrement_fk(self): - nodes = Table('nodes', self.metadata, - Column('id', Integer, primary_key=True), - Column('parent_id', Integer, ForeignKey('nodes.id')), - Column('data', String(30))) - nodes.create() - - r = nodes.insert().execute(data='foo') - id_ = r.last_inserted_ids()[0] - nodes.insert().execute(data='bar', parent_id=id_) - - @testing.fails_on('sqlite', 'FIXME: unknown') - def test_non_autoincrement(self): - # sqlite INT primary keys can be non-unique! (only for ints) - nonai = Table("nonaitest", self.metadata, - Column('id', Integer, autoincrement=False, primary_key=True), - Column('data', String(20))) - nonai.create() - - - try: - # postgres + mysql strict will fail on first row, - # mysql in legacy mode fails on second row - nonai.insert().execute(data='row 1') - nonai.insert().execute(data='row 2') - assert False - except sa.exc.SQLError, e: - assert True - - nonai.insert().execute(id=1, data='row 1') - - -class SequenceTest(testing.TestBase): - __requires__ = ('sequences',) - - def setUpAll(self): - global cartitems, sometable, metadata - metadata = MetaData(testing.db) - cartitems = Table("cartitems", metadata, - Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), - Column("description", String(40)), - Column("createdate", sa.DateTime()) - ) - sometable = Table( 'Manager', metadata, - Column('obj_id', Integer, Sequence('obj_id_seq'), ), - Column('name', String(128)), - Column('id', Integer, Sequence('Manager_id_seq', optional=True), - primary_key=True), - ) - - metadata.create_all() - - def testseqnonpk(self): - """test sequences fire off as defaults on non-pk columns""" - - result = sometable.insert().execute(name="somename") - assert 'id' in result.postfetch_cols() - - result = sometable.insert().execute(name="someother") - assert 'id' in result.postfetch_cols() - - sometable.insert().execute( - {'name':'name3'}, - {'name':'name4'}) - eq_(sometable.select().execute().fetchall(), - [(1, "somename", 1), - (2, "someother", 2), - (3, "name3", 3), - (4, "name4", 4)]) - - def testsequence(self): - cartitems.insert().execute(description='hi') - cartitems.insert().execute(description='there') - r = cartitems.insert().execute(description='lala') - - assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None - id_ = r.last_inserted_ids()[0] - - eq_(1, - sa.select([func.count(cartitems.c.cart_id)], - sa.and_(cartitems.c.description == 'lala', - cartitems.c.cart_id == id_)).scalar()) - - cartitems.select().execute().fetchall() - - @testing.fails_on('maxdb', 'FIXME: unknown') - # maxdb db-api seems to double-execute NEXTVAL internally somewhere, - # throwing off the numbers for these tests... - def test_implicit_sequence_exec(self): - s = Sequence("my_sequence", metadata=MetaData(testing.db)) - s.create() - try: - x = s.execute() - eq_(x, 1) - finally: - s.drop() - - @testing.fails_on('maxdb', 'FIXME: unknown') - def teststandalone_explicit(self): - s = Sequence("my_sequence") - s.create(bind=testing.db) - try: - x = s.execute(testing.db) - eq_(x, 1) - finally: - s.drop(testing.db) - - def test_checkfirst(self): - s = Sequence("my_sequence") - s.create(testing.db, checkfirst=False) - s.create(testing.db, checkfirst=True) - s.drop(testing.db, checkfirst=False) - s.drop(testing.db, checkfirst=True) - - @testing.fails_on('maxdb', 'FIXME: unknown') - def teststandalone2(self): - x = cartitems.c.cart_id.sequence.execute() - self.assert_(1 <= x <= 4) - - def tearDownAll(self): - metadata.drop_all() - - -if __name__ == "__main__": - testenv.main() |
