diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-04-01 13:57:44 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-04-01 13:57:44 -0400 |
| commit | 5ee1bb09de3b9675d482d608a0474c89f93259e4 (patch) | |
| tree | 4961455a758dcae42def4562e6a1a3f846338858 /test/sql | |
| parent | 82b6e074920cb972a569db4d2d395c8949868a31 (diff) | |
| parent | 25c6732019550e6f26ae4da58084b939e04cffa1 (diff) | |
| download | sqlalchemy-5ee1bb09de3b9675d482d608a0474c89f93259e4.tar.gz | |
merge default
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_compiler.py | 369 | ||||
| -rw-r--r-- | test/sql/test_delete.py | 86 | ||||
| -rw-r--r-- | test/sql/test_insert.py | 302 | ||||
| -rw-r--r-- | test/sql/test_labels.py | 102 | ||||
| -rw-r--r-- | test/sql/test_update.py | 646 |
5 files changed, 837 insertions, 668 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index fa15c7aa2..c3638f52f 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -14,8 +14,8 @@ from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message from sqlalchemy import testing from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import Integer, String, MetaData, Table, Column, select, \ - func, not_, cast, text, tuple_, exists, delete, update, bindparam,\ - insert, literal, and_, null, type_coerce, alias, or_, literal_column,\ + func, not_, cast, text, tuple_, exists, update, bindparam,\ + literal, and_, null, type_coerce, alias, or_, literal_column,\ Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\ intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\ over, subquery, case @@ -2460,326 +2460,10 @@ class KwargPropagationTest(fixtures.TestBase): class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' - def test_insert(self): - # generic insert, will create bind params for all columns - self.assert_compile(insert(table1), - "INSERT INTO mytable (myid, name, description) " - "VALUES (:myid, :name, :description)") - - # insert with user-supplied bind params for specific columns, - # cols provided literally - self.assert_compile( - insert(table1, { - table1.c.myid: bindparam('userid'), - table1.c.name: bindparam('username')}), - "INSERT INTO mytable (myid, name) VALUES (:userid, :username)") - - # insert with user-supplied bind params for specific columns, cols - # provided as strings - self.assert_compile( - insert(table1, dict(myid=3, name='jack')), - "INSERT INTO mytable (myid, name) VALUES (:myid, :name)" - ) - - # test with a tuple of params instead of named - self.assert_compile( - insert(table1, (3, 'jack', 'mydescription')), - "INSERT INTO mytable (myid, name, description) VALUES " - "(:myid, :name, :description)", - checkparams={ - 'myid': 3, 'name': 'jack', 'description': 'mydescription'} - ) - - self.assert_compile( - insert(table1, values={ - table1.c.myid: bindparam('userid') - }).values( - {table1.c.name: bindparam('username')}), - "INSERT INTO mytable (myid, name) VALUES (:userid, :username)" - ) - - self.assert_compile( - insert(table1, values=dict(myid=func.lala())), - "INSERT INTO mytable (myid) VALUES (lala())") - - def test_insert_prefix(self): - stmt = table1.insert().prefix_with("A", "B", dialect="mysql").\ - prefix_with("C", "D") - self.assert_compile(stmt, - "INSERT A B C D INTO mytable (myid, name, description) " - "VALUES (%s, %s, %s)", dialect=mysql.dialect() - ) - self.assert_compile(stmt, - "INSERT C D INTO mytable (myid, name, description) " - "VALUES (:myid, :name, :description)") - - def test_inline_default_insert(self): - metadata = MetaData() - table = Table('sometable', metadata, - Column('id', Integer, primary_key=True), - Column('foo', Integer, default=func.foobar())) - self.assert_compile( - table.insert(values={}, inline=True), - "INSERT INTO sometable (foo) VALUES (foobar())") - self.assert_compile( - table.insert(inline=True), - "INSERT INTO sometable (foo) VALUES (foobar())", params={}) - def test_insert_returning_not_in_default(self): stmt = table1.insert().returning(table1.c.myid) - assert_raises_message( - exc.CompileError, - "RETURNING is not supported by this dialect's statement compiler.", - stmt.compile - ) - - def test_empty_insert_default(self): - stmt = table1.insert().values({}) # hide from 2to3 - self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()") - - def test_empty_insert_default_values(self): - stmt = table1.insert().values({}) # hide from 2to3 - dialect = default.DefaultDialect() - dialect.supports_empty_insert = dialect.supports_default_values = True - self.assert_compile(stmt, "INSERT INTO mytable DEFAULT VALUES", - dialect=dialect) - - def test_empty_insert_not_supported(self): - stmt = table1.insert().values({}) # hide from 2to3 - dialect = default.DefaultDialect() - dialect.supports_empty_insert = dialect.supports_default_values = False - assert_raises_message( - exc.CompileError, - "The 'default' dialect with current database version " - "settings does not support empty inserts.", - stmt.compile, dialect=dialect - ) - - def test_multivalues_insert_not_supported(self): - stmt = table1.insert().values([{"myid": 1}, {"myid": 2}]) - dialect = default.DefaultDialect() - assert_raises_message( - exc.CompileError, - "The 'default' dialect with current database version settings " - "does not support in-place multirow inserts.", - stmt.compile, dialect=dialect - ) - - def test_multivalues_insert_named(self): - stmt = table1.insert().\ - values([{"myid": 1, "name": 'a', "description": 'b'}, - {"myid": 2, "name": 'c', "description": 'd'}, - {"myid": 3, "name": 'e', "description": 'f'} - ]) - - result = "INSERT INTO mytable (myid, name, description) VALUES " \ - "(:myid_0, :name_0, :description_0), " \ - "(:myid_1, :name_1, :description_1), " \ - "(:myid_2, :name_2, :description_2)" - - dialect = default.DefaultDialect() - dialect.supports_multivalues_insert = True - self.assert_compile(stmt, result, - checkparams={ - 'description_2': 'f', 'name_2': 'e', - 'name_0': 'a', 'name_1': 'c', 'myid_2': 3, - 'description_0': 'b', 'myid_0': 1, - 'myid_1': 2, 'description_1': 'd' - }, - dialect=dialect) - - def test_multivalues_insert_positional(self): - stmt = table1.insert().\ - values([{"myid": 1, "name": 'a', "description": 'b'}, - {"myid": 2, "name": 'c', "description": 'd'}, - {"myid": 3, "name": 'e', "description": 'f'} - ]) - - result = "INSERT INTO mytable (myid, name, description) VALUES " \ - "(%s, %s, %s), " \ - "(%s, %s, %s), " \ - "(%s, %s, %s)" \ - - dialect = default.DefaultDialect() - dialect.supports_multivalues_insert = True - dialect.paramstyle = "format" - dialect.positional = True - self.assert_compile(stmt, result, - checkpositional=(1, 'a', 'b', 2, 'c', 'd', 3, 'e', 'f'), - dialect=dialect) - - def test_multirow_inline_default_insert(self): - metadata = MetaData() - table = Table('sometable', metadata, - Column('id', Integer, primary_key=True), - Column('data', String), - Column('foo', Integer, default=func.foobar())) - - stmt = table.insert().\ - values([ - {"id": 1, "data": "data1"}, - {"id": 2, "data": "data2", "foo": "plainfoo"}, - {"id": 3, "data": "data3"}, - ]) - result = "INSERT INTO sometable (id, data, foo) VALUES "\ - "(%(id_0)s, %(data_0)s, foobar()), "\ - "(%(id_1)s, %(data_1)s, %(foo_1)s), "\ - "(%(id_2)s, %(data_2)s, foobar())" - - self.assert_compile(stmt, result, - checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3, - 'foo_1': 'plainfoo', 'data_1': 'data2', - 'id_1': 2, 'data_0': 'data1'}, - dialect=postgresql.dialect()) - - def test_multirow_server_default_insert(self): - metadata = MetaData() - table = Table('sometable', metadata, - Column('id', Integer, primary_key=True), - Column('data', String), - Column('foo', Integer, server_default=func.foobar())) - - stmt = table.insert().\ - values([ - {"id": 1, "data": "data1"}, - {"id": 2, "data": "data2", "foo": "plainfoo"}, - {"id": 3, "data": "data3"}, - ]) - result = "INSERT INTO sometable (id, data) VALUES "\ - "(%(id_0)s, %(data_0)s), "\ - "(%(id_1)s, %(data_1)s), "\ - "(%(id_2)s, %(data_2)s)" - - self.assert_compile(stmt, result, - checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3, - 'data_1': 'data2', - 'id_1': 2, 'data_0': 'data1'}, - dialect=postgresql.dialect()) - - stmt = table.insert().\ - values([ - {"id": 1, "data": "data1", "foo": "plainfoo"}, - {"id": 2, "data": "data2"}, - {"id": 3, "data": "data3", "foo": "otherfoo"}, - ]) - - # note the effect here is that the first set of params - # takes effect for the rest of them, when one is absent - result = "INSERT INTO sometable (id, data, foo) VALUES "\ - "(%(id_0)s, %(data_0)s, %(foo_0)s), "\ - "(%(id_1)s, %(data_1)s, %(foo_0)s), "\ - "(%(id_2)s, %(data_2)s, %(foo_2)s)" - - self.assert_compile(stmt, result, - checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3, - 'data_1': 'data2', - "foo_0": "plainfoo", - "foo_2": "otherfoo", - 'id_1': 2, 'data_0': 'data1'}, - dialect=postgresql.dialect()) - - def test_update(self): - self.assert_compile( - update(table1, table1.c.myid == 7), - "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", - params={table1.c.name: 'fred'}) - self.assert_compile( - table1.update().where(table1.c.myid == 7). - values({table1.c.myid: 5}), - "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", - checkparams={'myid': 5, 'myid_1': 7}) - self.assert_compile( - update(table1, table1.c.myid == 7), - "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", - params={'name': 'fred'}) - self.assert_compile( - update(table1, values={table1.c.name: table1.c.myid}), - "UPDATE mytable SET name=mytable.myid") - self.assert_compile( - update(table1, - whereclause=table1.c.name == bindparam('crit'), - values={table1.c.name: 'hi'}), - "UPDATE mytable SET name=:name WHERE mytable.name = :crit", - params={'crit': 'notthere'}, - checkparams={'crit': 'notthere', 'name': 'hi'}) - self.assert_compile( - update(table1, table1.c.myid == 12, - values={table1.c.name: table1.c.myid}), - "UPDATE mytable SET name=mytable.myid, description=" - ":description WHERE mytable.myid = :myid_1", - params={'description': 'test'}, - checkparams={'description': 'test', 'myid_1': 12}) - self.assert_compile( - update(table1, table1.c.myid == 12, - values={table1.c.myid: 9}), - "UPDATE mytable SET myid=:myid, description=:description " - "WHERE mytable.myid = :myid_1", - params={'myid_1': 12, 'myid': 9, 'description': 'test'}) - self.assert_compile( - update(table1, table1.c.myid == 12), - "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", - params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12}) - s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'}) - c = s.compile(column_keys=['id', 'name']) - self.assert_compile( - update(table1, table1.c.myid == 12, - values={table1.c.name: table1.c.myid} - ).values({table1.c.name: table1.c.name + 'foo'}), - "UPDATE mytable SET name=(mytable.name || :name_1), " - "description=:description WHERE mytable.myid = :myid_1", - params={'description': 'test'}) - eq_(str(s), str(c)) - - self.assert_compile(update(table1, - (table1.c.myid == func.hoho(4)) & - (table1.c.name == literal('foo') + - table1.c.name + literal('lala')), - values={ - table1.c.name: table1.c.name + "lala", - table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho')) - }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), " - "name=(mytable.name || :name_1) " - "WHERE mytable.myid = hoho(:hoho_1) " - "AND mytable.name = :param_2 || " - "mytable.name || :param_3") - - def test_update_prefix(self): - stmt = table1.update().prefix_with("A", "B", dialect="mysql").\ - prefix_with("C", "D") - self.assert_compile(stmt, - "UPDATE A B C D mytable SET myid=%s, name=%s, description=%s", - dialect=mysql.dialect() - ) - self.assert_compile(stmt, - "UPDATE C D mytable SET myid=:myid, name=:name, " - "description=:description") - - def test_aliased_update(self): - talias1 = table1.alias('t1') - self.assert_compile( - update(talias1, talias1.c.myid == 7), - "UPDATE mytable AS t1 SET name=:name WHERE t1.myid = :myid_1", - params={table1.c.name: 'fred'}) - self.assert_compile( - update(talias1, table1.c.myid == 7), - "UPDATE mytable AS t1 SET name=:name FROM " - "mytable WHERE mytable.myid = :myid_1", - params={table1.c.name: 'fred'}) - - def test_update_to_expression(self): - """test update from an expression. - - this logic is triggered currently by a left side that doesn't - have a key. The current supported use case is updating the index - of a Postgresql ARRAY type. - - """ - expr = func.foo(table1.c.myid) - assert not hasattr(expr, "key") - self.assert_compile( - table1.update().values({expr: 'bar'}), - "UPDATE mytable SET foo(myid)=:param_1" - ) + m = "RETURNING is not supported by this dialect's statement compiler." + assert_raises_message(exc.CompileError, m, stmt.compile) def test_correlated_update(self): # test against a straight text subquery @@ -2852,51 +2536,6 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): "AND myothertable.othername = mytable_1.name", dialect=mssql.dialect()) - def test_delete(self): - self.assert_compile( - delete(table1, table1.c.myid == 7), - "DELETE FROM mytable WHERE mytable.myid = :myid_1") - self.assert_compile( - table1.delete().where(table1.c.myid == 7), - "DELETE FROM mytable WHERE mytable.myid = :myid_1") - self.assert_compile( - table1.delete().where(table1.c.myid == 7).\ - where(table1.c.name == 'somename'), - "DELETE FROM mytable WHERE mytable.myid = :myid_1 " - "AND mytable.name = :name_1") - - def test_delete_prefix(self): - stmt = table1.delete().prefix_with("A", "B", dialect="mysql").\ - prefix_with("C", "D") - self.assert_compile(stmt, - "DELETE A B C D FROM mytable", - dialect=mysql.dialect() - ) - self.assert_compile(stmt, - "DELETE C D FROM mytable") - - def test_aliased_delete(self): - talias1 = table1.alias('t1') - self.assert_compile( - delete(talias1).where(talias1.c.myid == 7), - "DELETE FROM mytable AS t1 WHERE t1.myid = :myid_1") - - def test_correlated_delete(self): - # test a non-correlated WHERE clause - s = select([table2.c.othername], table2.c.otherid == 7) - u = delete(table1, table1.c.name == s) - self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = " - "(SELECT myothertable.othername FROM myothertable " - "WHERE myothertable.otherid = :otherid_1)") - - # test one that is actually correlated... - s = select([table2.c.othername], table2.c.otherid == table1.c.myid) - u = table1.delete(table1.c.name == s) - self.assert_compile(u, - "DELETE FROM mytable WHERE mytable.name = (SELECT " - "myothertable.othername FROM myothertable WHERE " - "myothertable.otherid = mytable.myid)") - def test_binds_that_match_columns(self): """test bind params named after column names replace the normal SET/VALUES generation.""" diff --git a/test/sql/test_delete.py b/test/sql/test_delete.py new file mode 100644 index 000000000..b56731515 --- /dev/null +++ b/test/sql/test_delete.py @@ -0,0 +1,86 @@ +#! coding:utf-8 + +from sqlalchemy import Column, Integer, String, Table, delete, select +from sqlalchemy.dialects import mysql +from sqlalchemy.testing import AssertsCompiledSQL, fixtures + + +class _DeleteTestBase(object): + @classmethod + def define_tables(cls, metadata): + Table('mytable', metadata, + Column('myid', Integer), + Column('name', String(30)), + Column('description', String(50))) + Table('myothertable', metadata, + Column('otherid', Integer), + Column('othername', String(30))) + + +class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_delete(self): + table1 = self.tables.mytable + + self.assert_compile( + delete(table1, table1.c.myid == 7), + 'DELETE FROM mytable WHERE mytable.myid = :myid_1') + + self.assert_compile( + table1.delete().where(table1.c.myid == 7), + 'DELETE FROM mytable WHERE mytable.myid = :myid_1') + + self.assert_compile( + table1.delete(). + where(table1.c.myid == 7). + where(table1.c.name == 'somename'), + 'DELETE FROM mytable ' + 'WHERE mytable.myid = :myid_1 ' + 'AND mytable.name = :name_1') + + def test_prefix_with(self): + table1 = self.tables.mytable + + stmt = table1.delete().\ + prefix_with('A', 'B', dialect='mysql').\ + prefix_with('C', 'D') + + self.assert_compile(stmt, + 'DELETE C D FROM mytable') + + self.assert_compile(stmt, + 'DELETE A B C D FROM mytable', + dialect=mysql.dialect()) + + def test_alias(self): + table1 = self.tables.mytable + + talias1 = table1.alias('t1') + stmt = delete(talias1).where(talias1.c.myid == 7) + + self.assert_compile(stmt, + 'DELETE FROM mytable AS t1 WHERE t1.myid = :myid_1') + + def test_correlated(self): + table1, table2 = self.tables.mytable, self.tables.myothertable + + # test a non-correlated WHERE clause + s = select([table2.c.othername], table2.c.otherid == 7) + self.assert_compile(delete(table1, table1.c.name == s), + 'DELETE FROM mytable ' + 'WHERE mytable.name = (' + 'SELECT myothertable.othername ' + 'FROM myothertable ' + 'WHERE myothertable.otherid = :otherid_1' + ')') + + # test one that is actually correlated... + s = select([table2.c.othername], table2.c.otherid == table1.c.myid) + self.assert_compile(table1.delete(table1.c.name == s), + 'DELETE FROM mytable ' + 'WHERE mytable.name = (' + 'SELECT myothertable.othername ' + 'FROM myothertable ' + 'WHERE myothertable.otherid = mytable.myid' + ')') diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py new file mode 100644 index 000000000..c74e11e18 --- /dev/null +++ b/test/sql/test_insert.py @@ -0,0 +1,302 @@ +#! coding:utf-8 + +from sqlalchemy import Column, Integer, MetaData, String, Table,\ + bindparam, exc, func, insert +from sqlalchemy.dialects import mysql, postgresql +from sqlalchemy.engine import default +from sqlalchemy.testing import AssertsCompiledSQL,\ + assert_raises_message, fixtures + + +class _InsertTestBase(object): + @classmethod + def define_tables(cls, metadata): + Table('mytable', metadata, + Column('myid', Integer), + Column('name', String(30)), + Column('description', String(30))) + Table('myothertable', metadata, + Column('otherid', Integer), + Column('othername', String(30))) + + +class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_generic_insert_bind_params_all_columns(self): + table1 = self.tables.mytable + + self.assert_compile(insert(table1), + 'INSERT INTO mytable (myid, name, description) ' + 'VALUES (:myid, :name, :description)') + + def test_insert_with_values_dict(self): + table1 = self.tables.mytable + + checkparams = { + 'myid': 3, + 'name': 'jack' + } + + self.assert_compile(insert(table1, dict(myid=3, name='jack')), + 'INSERT INTO mytable (myid, name) VALUES (:myid, :name)', + checkparams=checkparams) + + def test_insert_with_values_tuple(self): + table1 = self.tables.mytable + + checkparams = { + 'myid': 3, + 'name': 'jack', + 'description': 'mydescription' + } + + self.assert_compile(insert(table1, (3, 'jack', 'mydescription')), + 'INSERT INTO mytable (myid, name, description) ' + 'VALUES (:myid, :name, :description)', + checkparams=checkparams) + + def test_insert_with_values_func(self): + table1 = self.tables.mytable + + self.assert_compile(insert(table1, values=dict(myid=func.lala())), + 'INSERT INTO mytable (myid) VALUES (lala())') + + def test_insert_with_user_supplied_bind_params(self): + table1 = self.tables.mytable + + values = { + table1.c.myid: bindparam('userid'), + table1.c.name: bindparam('username') + } + + self.assert_compile(insert(table1, values), + 'INSERT INTO mytable (myid, name) VALUES (:userid, :username)') + + def test_insert_values(self): + table1 = self.tables.mytable + + values1 = {table1.c.myid: bindparam('userid')} + values2 = {table1.c.name: bindparam('username')} + + self.assert_compile(insert(table1, values=values1).values(values2), + 'INSERT INTO mytable (myid, name) VALUES (:userid, :username)') + + def test_prefix_with(self): + table1 = self.tables.mytable + + stmt = table1.insert().\ + prefix_with('A', 'B', dialect='mysql').\ + prefix_with('C', 'D') + + self.assert_compile(stmt, + 'INSERT C D INTO mytable (myid, name, description) ' + 'VALUES (:myid, :name, :description)') + + self.assert_compile(stmt, + 'INSERT A B C D INTO mytable (myid, name, description) ' + 'VALUES (%s, %s, %s)', dialect=mysql.dialect()) + + def test_inline_default(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=func.foobar())) + + self.assert_compile(table.insert(values={}, inline=True), + 'INSERT INTO sometable (foo) VALUES (foobar())') + + self.assert_compile(table.insert(inline=True), + 'INSERT INTO sometable (foo) VALUES (foobar())', params={}) + + +class EmptyTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_empty_insert_default(self): + table1 = self.tables.mytable + + stmt = table1.insert().values({}) # hide from 2to3 + self.assert_compile(stmt, 'INSERT INTO mytable () VALUES ()') + + def test_supports_empty_insert_true(self): + table1 = self.tables.mytable + + dialect = default.DefaultDialect() + dialect.supports_empty_insert = dialect.supports_default_values = True + + stmt = table1.insert().values({}) # hide from 2to3 + self.assert_compile(stmt, + 'INSERT INTO mytable DEFAULT VALUES', + dialect=dialect) + + def test_supports_empty_insert_false(self): + table1 = self.tables.mytable + + dialect = default.DefaultDialect() + dialect.supports_empty_insert = dialect.supports_default_values = False + + stmt = table1.insert().values({}) # hide from 2to3 + assert_raises_message(exc.CompileError, + "The 'default' dialect with current database version " + "settings does not support empty inserts.", + stmt.compile, dialect=dialect) + + +class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_not_supported(self): + table1 = self.tables.mytable + + dialect = default.DefaultDialect() + stmt = table1.insert().values([{'myid': 1}, {'myid': 2}]) + assert_raises_message( + exc.CompileError, + "The 'default' dialect with current database version settings " + "does not support in-place multirow inserts.", + stmt.compile, dialect=dialect) + + def test_named(self): + table1 = self.tables.mytable + + values = [ + {'myid': 1, 'name': 'a', 'description': 'b'}, + {'myid': 2, 'name': 'c', 'description': 'd'}, + {'myid': 3, 'name': 'e', 'description': 'f'} + ] + + checkparams = { + 'myid_0': 1, + 'myid_1': 2, + 'myid_2': 3, + 'name_0': 'a', + 'name_1': 'c', + 'name_2': 'e', + 'description_0': 'b', + 'description_1': 'd', + 'description_2': 'f', + } + + dialect = default.DefaultDialect() + dialect.supports_multivalues_insert = True + + self.assert_compile(table1.insert().values(values), + 'INSERT INTO mytable (myid, name, description) VALUES ' + '(:myid_0, :name_0, :description_0), ' + '(:myid_1, :name_1, :description_1), ' + '(:myid_2, :name_2, :description_2)', + checkparams=checkparams, dialect=dialect) + + def test_positional(self): + table1 = self.tables.mytable + + values = [ + {'myid': 1, 'name': 'a', 'description': 'b'}, + {'myid': 2, 'name': 'c', 'description': 'd'}, + {'myid': 3, 'name': 'e', 'description': 'f'} + ] + + checkpositional = (1, 'a', 'b', 2, 'c', 'd', 3, 'e', 'f') + + dialect = default.DefaultDialect() + dialect.supports_multivalues_insert = True + dialect.paramstyle = 'format' + dialect.positional = True + + self.assert_compile(table1.insert().values(values), + 'INSERT INTO mytable (myid, name, description) VALUES ' + '(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)', + checkpositional=checkpositional, dialect=dialect) + + def test_inline_default(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('data', String), + Column('foo', Integer, default=func.foobar())) + + values = [ + {'id': 1, 'data': 'data1'}, + {'id': 2, 'data': 'data2', 'foo': 'plainfoo'}, + {'id': 3, 'data': 'data3'}, + ] + + checkparams = { + 'id_0': 1, + 'id_1': 2, + 'id_2': 3, + 'data_0': 'data1', + 'data_1': 'data2', + 'data_2': 'data3', + 'foo_1': 'plainfoo', + } + + self.assert_compile(table.insert().values(values), + 'INSERT INTO sometable (id, data, foo) VALUES ' + '(%(id_0)s, %(data_0)s, foobar()), ' + '(%(id_1)s, %(data_1)s, %(foo_1)s), ' + '(%(id_2)s, %(data_2)s, foobar())', + checkparams=checkparams, dialect=postgresql.dialect()) + + def test_server_default(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('data', String), + Column('foo', Integer, server_default=func.foobar())) + + values = [ + {'id': 1, 'data': 'data1'}, + {'id': 2, 'data': 'data2', 'foo': 'plainfoo'}, + {'id': 3, 'data': 'data3'}, + ] + + checkparams = { + 'id_0': 1, + 'id_1': 2, + 'id_2': 3, + 'data_0': 'data1', + 'data_1': 'data2', + 'data_2': 'data3', + } + + self.assert_compile(table.insert().values(values), + 'INSERT INTO sometable (id, data) VALUES ' + '(%(id_0)s, %(data_0)s), ' + '(%(id_1)s, %(data_1)s), ' + '(%(id_2)s, %(data_2)s)', + checkparams=checkparams, dialect=postgresql.dialect()) + + def test_server_default_absent_value(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('data', String), + Column('foo', Integer, server_default=func.foobar())) + + values = [ + {'id': 1, 'data': 'data1', 'foo': 'plainfoo'}, + {'id': 2, 'data': 'data2'}, + {'id': 3, 'data': 'data3', 'foo': 'otherfoo'}, + ] + + checkparams = { + 'id_0': 1, + 'id_1': 2, + 'id_2': 3, + 'data_0': 'data1', + 'data_1': 'data2', + 'data_2': 'data3', + 'foo_0': 'plainfoo', + 'foo_2': 'otherfoo', + } + + # note the effect here is that the first set of params + # takes effect for the rest of them, when one is absent + self.assert_compile(table.insert().values(values), + 'INSERT INTO sometable (id, data, foo) VALUES ' + '(%(id_0)s, %(data_0)s, %(foo_0)s), ' + '(%(id_1)s, %(data_1)s, %(foo_0)s), ' + '(%(id_2)s, %(data_2)s, %(foo_2)s)', + checkparams=checkparams, dialect=postgresql.dialect()) diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py index d7cb8db4a..fd45d303f 100644 --- a/test/sql/test_labels.py +++ b/test/sql/test_labels.py @@ -1,19 +1,15 @@ - -from sqlalchemy import exc as exceptions -from sqlalchemy import testing -from sqlalchemy.testing import engines -from sqlalchemy import select, MetaData, Integer, or_ +from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_ from sqlalchemy.engine import default from sqlalchemy.sql import table, column -from sqlalchemy.testing import assert_raises, eq_ -from sqlalchemy.testing import fixtures, AssertsCompiledSQL -from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.testing import AssertsCompiledSQL, assert_raises, engines,\ + fixtures from sqlalchemy.testing.schema import Table, Column IDENT_LENGTH = 29 class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'DefaultDialect' table1 = table('some_large_named_table', column('this_is_the_primarykey_column'), @@ -25,9 +21,6 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): column('this_is_the_data_column') ) - __dialect__ = 'DefaultDialect' - - def _length_fixture(self, length=IDENT_LENGTH, positional=False): dialect = default.DefaultDialect() dialect.max_identifier_length = length @@ -60,7 +53,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): ta = table2.alias() on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column self.assert_compile( - select([table1, ta]).select_from(table1.join(ta, on)).\ + select([table1, ta]).select_from(table1.join(ta, on)). where(ta.c.this_is_the_data_column == 'data3'), 'SELECT ' 'some_large_named_table.this_is_the_primarykey_column, ' @@ -87,16 +80,9 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): t = Table('this_name_is_too_long_for_what_were_doing_in_this_test', m, Column('foo', Integer)) eng = self._engine_fixture() - for meth in ( - t.create, - t.drop, - m.create_all, - m.drop_all - ): - assert_raises( - exceptions.IdentifierError, - meth, eng - ) + methods = (t.create, t.drop, m.create_all, m.drop_all) + for meth in methods: + assert_raises(exceptions.IdentifierError, meth, eng) def _assert_labeled_table1_select(self, s): table1 = self.table1 @@ -263,7 +249,9 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): dialect=self._length_fixture(positional=True) ) + class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'DefaultDialect' table1 = table('some_large_named_table', column('this_is_the_primarykey_column'), @@ -275,8 +263,6 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): column('this_is_the_data_column') ) - __dialect__ = 'DefaultDialect' - def test_adjustable_1(self): table1 = self.table1 q = table1.select( @@ -404,27 +390,27 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): 'AS _1', dialect=compile_dialect) - def test_adjustable_result_schema_column_1(self): table1 = self.table1 + q = table1.select( table1.c.this_is_the_primarykey_column == 4).apply_labels().\ alias('foo') - dialect = default.DefaultDialect(label_length=10) + dialect = default.DefaultDialect(label_length=10) compiled = q.compile(dialect=dialect) + assert set(compiled.result_map['some_2'][1]).issuperset([ - table1.c.this_is_the_data_column, - 'some_large_named_table_this_is_the_data_column', - 'some_2' + table1.c.this_is_the_data_column, + 'some_large_named_table_this_is_the_data_column', + 'some_2' + ]) - ]) assert set(compiled.result_map['some_1'][1]).issuperset([ - table1.c.this_is_the_primarykey_column, - 'some_large_named_table_this_is_the_primarykey_column', - 'some_1' - - ]) + table1.c.this_is_the_primarykey_column, + 'some_large_named_table_this_is_the_primarykey_column', + 'some_1' + ]) def test_adjustable_result_schema_column_2(self): table1 = self.table1 @@ -434,20 +420,17 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): x = select([q]) dialect = default.DefaultDialect(label_length=10) - compiled = x.compile(dialect=dialect) + assert set(compiled.result_map['this_2'][1]).issuperset([ - q.corresponding_column(table1.c.this_is_the_data_column), - 'this_is_the_data_column', - 'this_2' + q.corresponding_column(table1.c.this_is_the_data_column), + 'this_is_the_data_column', + 'this_2']) - ]) assert set(compiled.result_map['this_1'][1]).issuperset([ - q.corresponding_column(table1.c.this_is_the_primarykey_column), - 'this_is_the_primarykey_column', - 'this_1' - - ]) + q.corresponding_column(table1.c.this_is_the_primarykey_column), + 'this_is_the_primarykey_column', + 'this_1']) def test_table_plus_column_exceeds_length(self): """test that the truncation only occurs when tablename + colname are @@ -490,7 +473,6 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): 'other_thirty_characters_table_.thirty_characters_table_id', dialect=compile_dialect) - def test_colnames_longer_than_labels_lowercase(self): t1 = table('a', column('abcde')) self._test_colnames_longer_than_labels(t1) @@ -507,30 +489,18 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): # 'abcde' is longer than 4, but rendered as itself # needs to have all characters s = select([a1]) - self.assert_compile( - select([a1]), - "SELECT asdf.abcde FROM a AS asdf", - dialect=dialect - ) + self.assert_compile(select([a1]), + 'SELECT asdf.abcde FROM a AS asdf', + dialect=dialect) compiled = s.compile(dialect=dialect) assert set(compiled.result_map['abcde'][1]).issuperset([ - 'abcde', - a1.c.abcde, - 'abcde' - ]) + 'abcde', a1.c.abcde, 'abcde']) # column still there, but short label s = select([a1]).apply_labels() - self.assert_compile( - s, - "SELECT asdf.abcde AS _1 FROM a AS asdf", - dialect=dialect - ) + self.assert_compile(s, + 'SELECT asdf.abcde AS _1 FROM a AS asdf', + dialect=dialect) compiled = s.compile(dialect=dialect) assert set(compiled.result_map['_1'][1]).issuperset([ - 'asdf_abcde', - a1.c.abcde, - '_1' - ]) - - + 'asdf_abcde', a1.c.abcde, '_1']) diff --git a/test/sql/test_update.py b/test/sql/test_update.py index b46489cd2..a8df86cd2 100644 --- a/test/sql/test_update.py +++ b/test/sql/test_update.py @@ -1,55 +1,53 @@ -from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL -import datetime from sqlalchemy import * -from sqlalchemy import exc, sql, util -from sqlalchemy.engine import default, base from sqlalchemy import testing -from sqlalchemy.testing import fixtures -from sqlalchemy.testing.schema import Table, Column from sqlalchemy.dialects import mysql +from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures +from sqlalchemy.testing.schema import Table, Column + class _UpdateFromTestBase(object): @classmethod def define_tables(cls, metadata): + Table('mytable', metadata, + Column('myid', Integer), + Column('name', String(30)), + Column('description', String(50))) + Table('myothertable', metadata, + Column('otherid', Integer), + Column('othername', String(30))) Table('users', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('name', String(30), nullable=False), - ) - + test_needs_autoincrement=True), + Column('name', String(30), nullable=False)) Table('addresses', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), Column('name', String(30), nullable=False), - Column('email_address', String(50), nullable=False), - ) - - Table("dingalings", metadata, + Column('email_address', String(50), nullable=False)) + Table('dingalings', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('address_id', None, ForeignKey('addresses.id')), - Column('data', String(30)), - ) + Column('data', String(30))) @classmethod def fixtures(cls): return dict( - users = ( + users=( ('id', 'name'), (7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck') ), - addresses = ( ('id', 'user_id', 'name', 'email_address'), - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed@wood.com"), - (3, 8, 'x', "ed@bettyboop.com"), - (4, 8, 'x', "ed@lala.com"), - (5, 9, 'x', "fred@fred.com") + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed@wood.com'), + (3, 8, 'x', 'ed@bettyboop.com'), + (4, 8, 'x', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com') ), dingalings = ( ('id', 'address_id', 'data'), @@ -59,288 +57,462 @@ class _UpdateFromTestBase(object): ) -class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL): +class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_update_1(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 7), + 'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1', + params={table1.c.name: 'fred'}) + + def test_update_2(self): + table1 = self.tables.mytable + + self.assert_compile( + table1.update(). + where(table1.c.myid == 7). + values({table1.c.myid: 5}), + 'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1', + checkparams={'myid': 5, 'myid_1': 7}) + + def test_update_3(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 7), + 'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1', + params={'name': 'fred'}) + + def test_update_4(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, values={table1.c.name: table1.c.myid}), + 'UPDATE mytable SET name=mytable.myid') + + def test_update_5(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, + whereclause=table1.c.name == bindparam('crit'), + values={table1.c.name: 'hi'}), + 'UPDATE mytable SET name=:name WHERE mytable.name = :crit', + params={'crit': 'notthere'}, + checkparams={'crit': 'notthere', 'name': 'hi'}) + + def test_update_6(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, + table1.c.myid == 12, + values={table1.c.name: table1.c.myid}), + 'UPDATE mytable ' + 'SET name=mytable.myid, description=:description ' + 'WHERE mytable.myid = :myid_1', + params={'description': 'test'}, + checkparams={'description': 'test', 'myid_1': 12}) + + def test_update_7(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 12, values={table1.c.myid: 9}), + 'UPDATE mytable ' + 'SET myid=:myid, description=:description ' + 'WHERE mytable.myid = :myid_1', + params={'myid_1': 12, 'myid': 9, 'description': 'test'}) + + def test_update_8(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 12), + 'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1', + params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12}) + + def test_update_9(self): + table1 = self.tables.mytable + + s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'}) + c = s.compile(column_keys=['id', 'name']) + eq_(str(s), str(c)) + + def test_update_10(self): + table1 = self.tables.mytable + + v1 = {table1.c.name: table1.c.myid} + v2 = {table1.c.name: table1.c.name + 'foo'} + self.assert_compile( + update(table1, table1.c.myid == 12, values=v1).values(v2), + 'UPDATE mytable ' + 'SET ' + 'name=(mytable.name || :name_1), ' + 'description=:description ' + 'WHERE mytable.myid = :myid_1', + params={'description': 'test'}) + + def test_update_11(self): + table1 = self.tables.mytable + + values = { + table1.c.name: table1.c.name + 'lala', + table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho')) + } + self.assert_compile(update(table1, + (table1.c.myid == func.hoho(4)) & + (table1.c.name == literal('foo') + + table1.c.name + literal('lala')), + values=values), + 'UPDATE mytable ' + 'SET ' + 'myid=do_stuff(mytable.myid, :param_1), ' + 'name=(mytable.name || :name_1) ' + 'WHERE ' + 'mytable.myid = hoho(:hoho_1) AND ' + 'mytable.name = :param_2 || mytable.name || :param_3') + + def test_prefix_with(self): + table1 = self.tables.mytable + + stmt = table1.update().\ + prefix_with('A', 'B', dialect='mysql').\ + prefix_with('C', 'D') + + self.assert_compile(stmt, + 'UPDATE C D mytable SET myid=:myid, name=:name, ' + 'description=:description') + + self.assert_compile(stmt, + 'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s', + dialect=mysql.dialect()) + + def test_alias(self): + table1 = self.tables.mytable + talias1 = table1.alias('t1') + + self.assert_compile(update(talias1, talias1.c.myid == 7), + 'UPDATE mytable AS t1 ' + 'SET name=:name ' + 'WHERE t1.myid = :myid_1', + params={table1.c.name: 'fred'}) + + self.assert_compile(update(talias1, table1.c.myid == 7), + 'UPDATE mytable AS t1 ' + 'SET name=:name ' + 'FROM mytable ' + 'WHERE mytable.myid = :myid_1', + params={table1.c.name: 'fred'}) + + def test_update_to_expression(self): + """test update from an expression. + + this logic is triggered currently by a left side that doesn't + have a key. The current supported use case is updating the index + of a Postgresql ARRAY type. + + """ + table1 = self.tables.mytable + expr = func.foo(table1.c.myid) + assert not hasattr(expr, 'key') + self.assert_compile(table1.update().values({expr: 'bar'}), + 'UPDATE mytable SET foo(myid)=:param_1') + + +class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, + AssertsCompiledSQL): __dialect__ = 'default' run_create_tables = run_inserts = run_deletes = None def test_render_table(self): users, addresses = self.tables.users, self.tables.addresses + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1'), - "UPDATE users SET name=:name FROM addresses " - "WHERE users.id = addresses.user_id AND " - "addresses.email_address = :email_address_1", - checkparams={u'email_address_1': 'e1', 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'), + 'UPDATE users ' + 'SET name=:name FROM addresses ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = :email_address_1', + checkparams={u'email_address_1': 'e1', 'name': 'newname'}) def test_render_multi_table(self): - users, addresses, dingalings = \ - self.tables.users, \ - self.tables.addresses, \ - self.tables.dingalings + users = self.tables.users + addresses = self.tables.addresses + dingalings = self.tables.dingalings + + checkparams = { + u'email_address_1': 'e1', + u'id_1': 2, + 'name': 'newname' + } + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1').\ - where(addresses.c.id==dingalings.c.address_id).\ - where(dingalings.c.id==2), - "UPDATE users SET name=:name FROM addresses, " - "dingalings WHERE users.id = addresses.user_id " - "AND addresses.email_address = :email_address_1 " - "AND addresses.id = dingalings.address_id AND " - "dingalings.id = :id_1", - checkparams={u'email_address_1': 'e1', u'id_1': 2, - 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'). + where(addresses.c.id == dingalings.c.address_id). + where(dingalings.c.id == 2), + 'UPDATE users ' + 'SET name=:name ' + 'FROM addresses, dingalings ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = :email_address_1 AND ' + 'addresses.id = dingalings.address_id AND ' + 'dingalings.id = :id_1', + checkparams=checkparams) def test_render_table_mysql(self): users, addresses = self.tables.users, self.tables.addresses + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1'), - "UPDATE users, addresses SET users.name=%s " - "WHERE users.id = addresses.user_id AND " - "addresses.email_address = %s", + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'), + 'UPDATE users, addresses ' + 'SET users.name=%s ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = %s', checkparams={u'email_address_1': 'e1', 'name': 'newname'}, - dialect=mysql.dialect() - ) + dialect=mysql.dialect()) def test_render_subquery(self): users, addresses = self.tables.users, self.tables.addresses - subq = select([addresses.c.id, - addresses.c.user_id, - addresses.c.email_address]).\ - where(addresses.c.id==7).alias() + + checkparams = { + u'email_address_1': 'e1', + u'id_1': 7, + 'name': 'newname' + } + + cols = [ + addresses.c.id, + addresses.c.user_id, + addresses.c.email_address + ] + + subq = select(cols).where(addresses.c.id == 7).alias() self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==subq.c.user_id).\ - where(subq.c.email_address=='e1'), - "UPDATE users SET name=:name FROM " - "(SELECT addresses.id AS id, addresses.user_id " - "AS user_id, addresses.email_address AS " - "email_address FROM addresses WHERE addresses.id = " - ":id_1) AS anon_1 WHERE users.id = anon_1.user_id " - "AND anon_1.email_address = :email_address_1", - checkparams={u'email_address_1': 'e1', - u'id_1': 7, 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == subq.c.user_id). + where(subq.c.email_address == 'e1'), + 'UPDATE users ' + 'SET name=:name FROM (' + 'SELECT ' + 'addresses.id AS id, ' + 'addresses.user_id AS user_id, ' + 'addresses.email_address AS email_address ' + 'FROM addresses ' + 'WHERE addresses.id = :id_1' + ') AS anon_1 ' + 'WHERE users.id = anon_1.user_id ' + 'AND anon_1.email_address = :email_address_1', + checkparams=checkparams) + class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest): @testing.requires.update_from def test_exec_two_table(self): users, addresses = self.tables.users, self.tables.addresses + testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - testing.db.execute( - addresses.select().\ - order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] - ) + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) @testing.requires.update_from def test_exec_two_table_plus_alias(self): users, addresses = self.tables.users, self.tables.addresses - a1 = addresses.alias() + a1 = addresses.alias() testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==a1.c.user_id).\ - where(users.c.name=='ed').\ - where(a1.c.id==addresses.c.id) - ) - eq_( - testing.db.execute( - addresses.select().\ - order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == a1.c.user_id). + where(users.c.name == 'ed'). + where(a1.c.id == addresses.c.id) ) + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + @testing.requires.update_from def test_exec_three_table(self): - users, addresses, dingalings = \ - self.tables.users, \ - self.tables.addresses, \ - self.tables.dingalings + users = self.tables.users + addresses = self.tables.addresses + dingalings = self.tables.dingalings + testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed'). - where(addresses.c.id==dingalings.c.address_id).\ - where(dingalings.c.id==1), - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id) - ).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed@bettyboop.com"), - (4, 8, 'x', "ed@lala.com"), - (5, 9, 'x', "fred@fred.com") - ] - ) + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed'). + where(addresses.c.id == dingalings.c.address_id). + where(dingalings.c.id == 1)) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed@bettyboop.com'), + (4, 8, 'x', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) @testing.only_on('mysql', 'Multi table update') def test_exec_multitable(self): users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.email_address: users.c.name, + users.c.name: 'ed2' + } + testing.db.execute( - addresses.update().\ - values({ - addresses.c.email_address:users.c.name, - users.c.name:'ed2' - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] - ) - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (7, 'jack'), - (8, 'ed2'), - (9, 'fred'), - (10, 'chuck') - ] - ) + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (7, 'jack'), + (8, 'ed2'), + (9, 'fred'), + (10, 'chuck')] + self._assert_users(users, expected) -class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest): + def _assert_addresses(self, addresses, expected): + stmt = addresses.select().order_by(addresses.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + def _assert_users(self, users, expected): + stmt = users.select().order_by(users.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + +class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, + fixtures.TablesTest): @classmethod def define_tables(cls, metadata): Table('users', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('name', String(30), nullable=False), - Column('some_update', String(30), onupdate="im the update") - ) + Column('some_update', String(30), onupdate='im the update')) Table('addresses', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), - Column('email_address', String(50), nullable=False), - ) + Column('email_address', String(50), nullable=False)) @classmethod def fixtures(cls): return dict( - users = ( + users=( ('id', 'name', 'some_update'), (8, 'ed', 'value'), (9, 'fred', 'value'), ), - - addresses = ( + addresses=( ('id', 'user_id', 'email_address'), - (2, 8, "ed@wood.com"), - (3, 8, "ed@bettyboop.com"), - (4, 9, "fred@fred.com") + (2, 8, 'ed@wood.com'), + (3, 8, 'ed@bettyboop.com'), + (4, 9, 'fred@fred.com') ), ) @testing.only_on('mysql', 'Multi table update') def test_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.email_address: users.c.name, + users.c.name: 'ed2' + } + ret = testing.db.execute( - addresses.update().\ - values({ - addresses.c.email_address:users.c.name, - users.c.name:'ed2' - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - set(ret.prefetch_cols()), - set([users.c.some_update]) - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (2, 8, "ed"), - (3, 8, "ed"), - (4, 9, "fred@fred.com") - ] - ) - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (8, 'ed2', 'im the update'), - (9, 'fred', 'value'), - ] - ) + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + eq_(set(ret.prefetch_cols()), set([users.c.some_update])) + + expected = [ + (2, 8, 'ed'), + (3, 8, 'ed'), + (4, 9, 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (8, 'ed2', 'im the update'), + (9, 'fred', 'value')] + self._assert_users(users, expected) @testing.only_on('mysql', 'Multi table update') def test_no_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses + ret = testing.db.execute( - addresses.update().\ - values({ - 'email_address':users.c.name, - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - ret.prefetch_cols(),[] - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (2, 8, "ed"), - (3, 8, "ed"), - (4, 9, "fred@fred.com") - ] - ) - # users table not actually updated, - # so no onupdate - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (8, 'ed', 'value'), - (9, 'fred', 'value'), - ] - ) + addresses.update(). + values({'email_address': users.c.name}). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + eq_(ret.prefetch_cols(), []) + + expected = [ + (2, 8, 'ed'), + (3, 8, 'ed'), + (4, 9, 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + # users table not actually updated, so no onupdate + expected = [ + (8, 'ed', 'value'), + (9, 'fred', 'value')] + self._assert_users(users, expected) + + def _assert_addresses(self, addresses, expected): + stmt = addresses.select().order_by(addresses.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + def _assert_users(self, users, expected): + stmt = users.select().order_by(users.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) |
