summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-04-01 13:57:44 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-04-01 13:57:44 -0400
commit5ee1bb09de3b9675d482d608a0474c89f93259e4 (patch)
tree4961455a758dcae42def4562e6a1a3f846338858 /test/sql
parent82b6e074920cb972a569db4d2d395c8949868a31 (diff)
parent25c6732019550e6f26ae4da58084b939e04cffa1 (diff)
downloadsqlalchemy-5ee1bb09de3b9675d482d608a0474c89f93259e4.tar.gz
merge default
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_compiler.py369
-rw-r--r--test/sql/test_delete.py86
-rw-r--r--test/sql/test_insert.py302
-rw-r--r--test/sql/test_labels.py102
-rw-r--r--test/sql/test_update.py646
5 files changed, 837 insertions, 668 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index fa15c7aa2..c3638f52f 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -14,8 +14,8 @@ from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
- func, not_, cast, text, tuple_, exists, delete, update, bindparam,\
- insert, literal, and_, null, type_coerce, alias, or_, literal_column,\
+ func, not_, cast, text, tuple_, exists, update, bindparam,\
+ literal, and_, null, type_coerce, alias, or_, literal_column,\
Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\
intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\
over, subquery, case
@@ -2460,326 +2460,10 @@ class KwargPropagationTest(fixtures.TestBase):
class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
- def test_insert(self):
- # generic insert, will create bind params for all columns
- self.assert_compile(insert(table1),
- "INSERT INTO mytable (myid, name, description) "
- "VALUES (:myid, :name, :description)")
-
- # insert with user-supplied bind params for specific columns,
- # cols provided literally
- self.assert_compile(
- insert(table1, {
- table1.c.myid: bindparam('userid'),
- table1.c.name: bindparam('username')}),
- "INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
-
- # insert with user-supplied bind params for specific columns, cols
- # provided as strings
- self.assert_compile(
- insert(table1, dict(myid=3, name='jack')),
- "INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
- )
-
- # test with a tuple of params instead of named
- self.assert_compile(
- insert(table1, (3, 'jack', 'mydescription')),
- "INSERT INTO mytable (myid, name, description) VALUES "
- "(:myid, :name, :description)",
- checkparams={
- 'myid': 3, 'name': 'jack', 'description': 'mydescription'}
- )
-
- self.assert_compile(
- insert(table1, values={
- table1.c.myid: bindparam('userid')
- }).values(
- {table1.c.name: bindparam('username')}),
- "INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
- )
-
- self.assert_compile(
- insert(table1, values=dict(myid=func.lala())),
- "INSERT INTO mytable (myid) VALUES (lala())")
-
- def test_insert_prefix(self):
- stmt = table1.insert().prefix_with("A", "B", dialect="mysql").\
- prefix_with("C", "D")
- self.assert_compile(stmt,
- "INSERT A B C D INTO mytable (myid, name, description) "
- "VALUES (%s, %s, %s)", dialect=mysql.dialect()
- )
- self.assert_compile(stmt,
- "INSERT C D INTO mytable (myid, name, description) "
- "VALUES (:myid, :name, :description)")
-
- def test_inline_default_insert(self):
- metadata = MetaData()
- table = Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- Column('foo', Integer, default=func.foobar()))
- self.assert_compile(
- table.insert(values={}, inline=True),
- "INSERT INTO sometable (foo) VALUES (foobar())")
- self.assert_compile(
- table.insert(inline=True),
- "INSERT INTO sometable (foo) VALUES (foobar())", params={})
-
def test_insert_returning_not_in_default(self):
stmt = table1.insert().returning(table1.c.myid)
- assert_raises_message(
- exc.CompileError,
- "RETURNING is not supported by this dialect's statement compiler.",
- stmt.compile
- )
-
- def test_empty_insert_default(self):
- stmt = table1.insert().values({}) # hide from 2to3
- self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()")
-
- def test_empty_insert_default_values(self):
- stmt = table1.insert().values({}) # hide from 2to3
- dialect = default.DefaultDialect()
- dialect.supports_empty_insert = dialect.supports_default_values = True
- self.assert_compile(stmt, "INSERT INTO mytable DEFAULT VALUES",
- dialect=dialect)
-
- def test_empty_insert_not_supported(self):
- stmt = table1.insert().values({}) # hide from 2to3
- dialect = default.DefaultDialect()
- dialect.supports_empty_insert = dialect.supports_default_values = False
- assert_raises_message(
- exc.CompileError,
- "The 'default' dialect with current database version "
- "settings does not support empty inserts.",
- stmt.compile, dialect=dialect
- )
-
- def test_multivalues_insert_not_supported(self):
- stmt = table1.insert().values([{"myid": 1}, {"myid": 2}])
- dialect = default.DefaultDialect()
- assert_raises_message(
- exc.CompileError,
- "The 'default' dialect with current database version settings "
- "does not support in-place multirow inserts.",
- stmt.compile, dialect=dialect
- )
-
- def test_multivalues_insert_named(self):
- stmt = table1.insert().\
- values([{"myid": 1, "name": 'a', "description": 'b'},
- {"myid": 2, "name": 'c', "description": 'd'},
- {"myid": 3, "name": 'e', "description": 'f'}
- ])
-
- result = "INSERT INTO mytable (myid, name, description) VALUES " \
- "(:myid_0, :name_0, :description_0), " \
- "(:myid_1, :name_1, :description_1), " \
- "(:myid_2, :name_2, :description_2)"
-
- dialect = default.DefaultDialect()
- dialect.supports_multivalues_insert = True
- self.assert_compile(stmt, result,
- checkparams={
- 'description_2': 'f', 'name_2': 'e',
- 'name_0': 'a', 'name_1': 'c', 'myid_2': 3,
- 'description_0': 'b', 'myid_0': 1,
- 'myid_1': 2, 'description_1': 'd'
- },
- dialect=dialect)
-
- def test_multivalues_insert_positional(self):
- stmt = table1.insert().\
- values([{"myid": 1, "name": 'a', "description": 'b'},
- {"myid": 2, "name": 'c', "description": 'd'},
- {"myid": 3, "name": 'e', "description": 'f'}
- ])
-
- result = "INSERT INTO mytable (myid, name, description) VALUES " \
- "(%s, %s, %s), " \
- "(%s, %s, %s), " \
- "(%s, %s, %s)" \
-
- dialect = default.DefaultDialect()
- dialect.supports_multivalues_insert = True
- dialect.paramstyle = "format"
- dialect.positional = True
- self.assert_compile(stmt, result,
- checkpositional=(1, 'a', 'b', 2, 'c', 'd', 3, 'e', 'f'),
- dialect=dialect)
-
- def test_multirow_inline_default_insert(self):
- metadata = MetaData()
- table = Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String),
- Column('foo', Integer, default=func.foobar()))
-
- stmt = table.insert().\
- values([
- {"id": 1, "data": "data1"},
- {"id": 2, "data": "data2", "foo": "plainfoo"},
- {"id": 3, "data": "data3"},
- ])
- result = "INSERT INTO sometable (id, data, foo) VALUES "\
- "(%(id_0)s, %(data_0)s, foobar()), "\
- "(%(id_1)s, %(data_1)s, %(foo_1)s), "\
- "(%(id_2)s, %(data_2)s, foobar())"
-
- self.assert_compile(stmt, result,
- checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3,
- 'foo_1': 'plainfoo', 'data_1': 'data2',
- 'id_1': 2, 'data_0': 'data1'},
- dialect=postgresql.dialect())
-
- def test_multirow_server_default_insert(self):
- metadata = MetaData()
- table = Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String),
- Column('foo', Integer, server_default=func.foobar()))
-
- stmt = table.insert().\
- values([
- {"id": 1, "data": "data1"},
- {"id": 2, "data": "data2", "foo": "plainfoo"},
- {"id": 3, "data": "data3"},
- ])
- result = "INSERT INTO sometable (id, data) VALUES "\
- "(%(id_0)s, %(data_0)s), "\
- "(%(id_1)s, %(data_1)s), "\
- "(%(id_2)s, %(data_2)s)"
-
- self.assert_compile(stmt, result,
- checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3,
- 'data_1': 'data2',
- 'id_1': 2, 'data_0': 'data1'},
- dialect=postgresql.dialect())
-
- stmt = table.insert().\
- values([
- {"id": 1, "data": "data1", "foo": "plainfoo"},
- {"id": 2, "data": "data2"},
- {"id": 3, "data": "data3", "foo": "otherfoo"},
- ])
-
- # note the effect here is that the first set of params
- # takes effect for the rest of them, when one is absent
- result = "INSERT INTO sometable (id, data, foo) VALUES "\
- "(%(id_0)s, %(data_0)s, %(foo_0)s), "\
- "(%(id_1)s, %(data_1)s, %(foo_0)s), "\
- "(%(id_2)s, %(data_2)s, %(foo_2)s)"
-
- self.assert_compile(stmt, result,
- checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3,
- 'data_1': 'data2',
- "foo_0": "plainfoo",
- "foo_2": "otherfoo",
- 'id_1': 2, 'data_0': 'data1'},
- dialect=postgresql.dialect())
-
- def test_update(self):
- self.assert_compile(
- update(table1, table1.c.myid == 7),
- "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
- params={table1.c.name: 'fred'})
- self.assert_compile(
- table1.update().where(table1.c.myid == 7).
- values({table1.c.myid: 5}),
- "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
- checkparams={'myid': 5, 'myid_1': 7})
- self.assert_compile(
- update(table1, table1.c.myid == 7),
- "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
- params={'name': 'fred'})
- self.assert_compile(
- update(table1, values={table1.c.name: table1.c.myid}),
- "UPDATE mytable SET name=mytable.myid")
- self.assert_compile(
- update(table1,
- whereclause=table1.c.name == bindparam('crit'),
- values={table1.c.name: 'hi'}),
- "UPDATE mytable SET name=:name WHERE mytable.name = :crit",
- params={'crit': 'notthere'},
- checkparams={'crit': 'notthere', 'name': 'hi'})
- self.assert_compile(
- update(table1, table1.c.myid == 12,
- values={table1.c.name: table1.c.myid}),
- "UPDATE mytable SET name=mytable.myid, description="
- ":description WHERE mytable.myid = :myid_1",
- params={'description': 'test'},
- checkparams={'description': 'test', 'myid_1': 12})
- self.assert_compile(
- update(table1, table1.c.myid == 12,
- values={table1.c.myid: 9}),
- "UPDATE mytable SET myid=:myid, description=:description "
- "WHERE mytable.myid = :myid_1",
- params={'myid_1': 12, 'myid': 9, 'description': 'test'})
- self.assert_compile(
- update(table1, table1.c.myid == 12),
- "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
- params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
- s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
- c = s.compile(column_keys=['id', 'name'])
- self.assert_compile(
- update(table1, table1.c.myid == 12,
- values={table1.c.name: table1.c.myid}
- ).values({table1.c.name: table1.c.name + 'foo'}),
- "UPDATE mytable SET name=(mytable.name || :name_1), "
- "description=:description WHERE mytable.myid = :myid_1",
- params={'description': 'test'})
- eq_(str(s), str(c))
-
- self.assert_compile(update(table1,
- (table1.c.myid == func.hoho(4)) &
- (table1.c.name == literal('foo') +
- table1.c.name + literal('lala')),
- values={
- table1.c.name: table1.c.name + "lala",
- table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
- "name=(mytable.name || :name_1) "
- "WHERE mytable.myid = hoho(:hoho_1) "
- "AND mytable.name = :param_2 || "
- "mytable.name || :param_3")
-
- def test_update_prefix(self):
- stmt = table1.update().prefix_with("A", "B", dialect="mysql").\
- prefix_with("C", "D")
- self.assert_compile(stmt,
- "UPDATE A B C D mytable SET myid=%s, name=%s, description=%s",
- dialect=mysql.dialect()
- )
- self.assert_compile(stmt,
- "UPDATE C D mytable SET myid=:myid, name=:name, "
- "description=:description")
-
- def test_aliased_update(self):
- talias1 = table1.alias('t1')
- self.assert_compile(
- update(talias1, talias1.c.myid == 7),
- "UPDATE mytable AS t1 SET name=:name WHERE t1.myid = :myid_1",
- params={table1.c.name: 'fred'})
- self.assert_compile(
- update(talias1, table1.c.myid == 7),
- "UPDATE mytable AS t1 SET name=:name FROM "
- "mytable WHERE mytable.myid = :myid_1",
- params={table1.c.name: 'fred'})
-
- def test_update_to_expression(self):
- """test update from an expression.
-
- this logic is triggered currently by a left side that doesn't
- have a key. The current supported use case is updating the index
- of a Postgresql ARRAY type.
-
- """
- expr = func.foo(table1.c.myid)
- assert not hasattr(expr, "key")
- self.assert_compile(
- table1.update().values({expr: 'bar'}),
- "UPDATE mytable SET foo(myid)=:param_1"
- )
+ m = "RETURNING is not supported by this dialect's statement compiler."
+ assert_raises_message(exc.CompileError, m, stmt.compile)
def test_correlated_update(self):
# test against a straight text subquery
@@ -2852,51 +2536,6 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
"AND myothertable.othername = mytable_1.name",
dialect=mssql.dialect())
- def test_delete(self):
- self.assert_compile(
- delete(table1, table1.c.myid == 7),
- "DELETE FROM mytable WHERE mytable.myid = :myid_1")
- self.assert_compile(
- table1.delete().where(table1.c.myid == 7),
- "DELETE FROM mytable WHERE mytable.myid = :myid_1")
- self.assert_compile(
- table1.delete().where(table1.c.myid == 7).\
- where(table1.c.name == 'somename'),
- "DELETE FROM mytable WHERE mytable.myid = :myid_1 "
- "AND mytable.name = :name_1")
-
- def test_delete_prefix(self):
- stmt = table1.delete().prefix_with("A", "B", dialect="mysql").\
- prefix_with("C", "D")
- self.assert_compile(stmt,
- "DELETE A B C D FROM mytable",
- dialect=mysql.dialect()
- )
- self.assert_compile(stmt,
- "DELETE C D FROM mytable")
-
- def test_aliased_delete(self):
- talias1 = table1.alias('t1')
- self.assert_compile(
- delete(talias1).where(talias1.c.myid == 7),
- "DELETE FROM mytable AS t1 WHERE t1.myid = :myid_1")
-
- def test_correlated_delete(self):
- # test a non-correlated WHERE clause
- s = select([table2.c.othername], table2.c.otherid == 7)
- u = delete(table1, table1.c.name == s)
- self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "
- "(SELECT myothertable.othername FROM myothertable "
- "WHERE myothertable.otherid = :otherid_1)")
-
- # test one that is actually correlated...
- s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
- u = table1.delete(table1.c.name == s)
- self.assert_compile(u,
- "DELETE FROM mytable WHERE mytable.name = (SELECT "
- "myothertable.othername FROM myothertable WHERE "
- "myothertable.otherid = mytable.myid)")
-
def test_binds_that_match_columns(self):
"""test bind params named after column names
replace the normal SET/VALUES generation."""
diff --git a/test/sql/test_delete.py b/test/sql/test_delete.py
new file mode 100644
index 000000000..b56731515
--- /dev/null
+++ b/test/sql/test_delete.py
@@ -0,0 +1,86 @@
+#! coding:utf-8
+
+from sqlalchemy import Column, Integer, String, Table, delete, select
+from sqlalchemy.dialects import mysql
+from sqlalchemy.testing import AssertsCompiledSQL, fixtures
+
+
+class _DeleteTestBase(object):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('mytable', metadata,
+ Column('myid', Integer),
+ Column('name', String(30)),
+ Column('description', String(50)))
+ Table('myothertable', metadata,
+ Column('otherid', Integer),
+ Column('othername', String(30)))
+
+
+class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_delete(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ delete(table1, table1.c.myid == 7),
+ 'DELETE FROM mytable WHERE mytable.myid = :myid_1')
+
+ self.assert_compile(
+ table1.delete().where(table1.c.myid == 7),
+ 'DELETE FROM mytable WHERE mytable.myid = :myid_1')
+
+ self.assert_compile(
+ table1.delete().
+ where(table1.c.myid == 7).
+ where(table1.c.name == 'somename'),
+ 'DELETE FROM mytable '
+ 'WHERE mytable.myid = :myid_1 '
+ 'AND mytable.name = :name_1')
+
+ def test_prefix_with(self):
+ table1 = self.tables.mytable
+
+ stmt = table1.delete().\
+ prefix_with('A', 'B', dialect='mysql').\
+ prefix_with('C', 'D')
+
+ self.assert_compile(stmt,
+ 'DELETE C D FROM mytable')
+
+ self.assert_compile(stmt,
+ 'DELETE A B C D FROM mytable',
+ dialect=mysql.dialect())
+
+ def test_alias(self):
+ table1 = self.tables.mytable
+
+ talias1 = table1.alias('t1')
+ stmt = delete(talias1).where(talias1.c.myid == 7)
+
+ self.assert_compile(stmt,
+ 'DELETE FROM mytable AS t1 WHERE t1.myid = :myid_1')
+
+ def test_correlated(self):
+ table1, table2 = self.tables.mytable, self.tables.myothertable
+
+ # test a non-correlated WHERE clause
+ s = select([table2.c.othername], table2.c.otherid == 7)
+ self.assert_compile(delete(table1, table1.c.name == s),
+ 'DELETE FROM mytable '
+ 'WHERE mytable.name = ('
+ 'SELECT myothertable.othername '
+ 'FROM myothertable '
+ 'WHERE myothertable.otherid = :otherid_1'
+ ')')
+
+ # test one that is actually correlated...
+ s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
+ self.assert_compile(table1.delete(table1.c.name == s),
+ 'DELETE FROM mytable '
+ 'WHERE mytable.name = ('
+ 'SELECT myothertable.othername '
+ 'FROM myothertable '
+ 'WHERE myothertable.otherid = mytable.myid'
+ ')')
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
new file mode 100644
index 000000000..c74e11e18
--- /dev/null
+++ b/test/sql/test_insert.py
@@ -0,0 +1,302 @@
+#! coding:utf-8
+
+from sqlalchemy import Column, Integer, MetaData, String, Table,\
+ bindparam, exc, func, insert
+from sqlalchemy.dialects import mysql, postgresql
+from sqlalchemy.engine import default
+from sqlalchemy.testing import AssertsCompiledSQL,\
+ assert_raises_message, fixtures
+
+
+class _InsertTestBase(object):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('mytable', metadata,
+ Column('myid', Integer),
+ Column('name', String(30)),
+ Column('description', String(30)))
+ Table('myothertable', metadata,
+ Column('otherid', Integer),
+ Column('othername', String(30)))
+
+
+class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_generic_insert_bind_params_all_columns(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(insert(table1),
+ 'INSERT INTO mytable (myid, name, description) '
+ 'VALUES (:myid, :name, :description)')
+
+ def test_insert_with_values_dict(self):
+ table1 = self.tables.mytable
+
+ checkparams = {
+ 'myid': 3,
+ 'name': 'jack'
+ }
+
+ self.assert_compile(insert(table1, dict(myid=3, name='jack')),
+ 'INSERT INTO mytable (myid, name) VALUES (:myid, :name)',
+ checkparams=checkparams)
+
+ def test_insert_with_values_tuple(self):
+ table1 = self.tables.mytable
+
+ checkparams = {
+ 'myid': 3,
+ 'name': 'jack',
+ 'description': 'mydescription'
+ }
+
+ self.assert_compile(insert(table1, (3, 'jack', 'mydescription')),
+ 'INSERT INTO mytable (myid, name, description) '
+ 'VALUES (:myid, :name, :description)',
+ checkparams=checkparams)
+
+ def test_insert_with_values_func(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(insert(table1, values=dict(myid=func.lala())),
+ 'INSERT INTO mytable (myid) VALUES (lala())')
+
+ def test_insert_with_user_supplied_bind_params(self):
+ table1 = self.tables.mytable
+
+ values = {
+ table1.c.myid: bindparam('userid'),
+ table1.c.name: bindparam('username')
+ }
+
+ self.assert_compile(insert(table1, values),
+ 'INSERT INTO mytable (myid, name) VALUES (:userid, :username)')
+
+ def test_insert_values(self):
+ table1 = self.tables.mytable
+
+ values1 = {table1.c.myid: bindparam('userid')}
+ values2 = {table1.c.name: bindparam('username')}
+
+ self.assert_compile(insert(table1, values=values1).values(values2),
+ 'INSERT INTO mytable (myid, name) VALUES (:userid, :username)')
+
+ def test_prefix_with(self):
+ table1 = self.tables.mytable
+
+ stmt = table1.insert().\
+ prefix_with('A', 'B', dialect='mysql').\
+ prefix_with('C', 'D')
+
+ self.assert_compile(stmt,
+ 'INSERT C D INTO mytable (myid, name, description) '
+ 'VALUES (:myid, :name, :description)')
+
+ self.assert_compile(stmt,
+ 'INSERT A B C D INTO mytable (myid, name, description) '
+ 'VALUES (%s, %s, %s)', dialect=mysql.dialect())
+
+ def test_inline_default(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=func.foobar()))
+
+ self.assert_compile(table.insert(values={}, inline=True),
+ 'INSERT INTO sometable (foo) VALUES (foobar())')
+
+ self.assert_compile(table.insert(inline=True),
+ 'INSERT INTO sometable (foo) VALUES (foobar())', params={})
+
+
+class EmptyTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_empty_insert_default(self):
+ table1 = self.tables.mytable
+
+ stmt = table1.insert().values({}) # hide from 2to3
+ self.assert_compile(stmt, 'INSERT INTO mytable () VALUES ()')
+
+ def test_supports_empty_insert_true(self):
+ table1 = self.tables.mytable
+
+ dialect = default.DefaultDialect()
+ dialect.supports_empty_insert = dialect.supports_default_values = True
+
+ stmt = table1.insert().values({}) # hide from 2to3
+ self.assert_compile(stmt,
+ 'INSERT INTO mytable DEFAULT VALUES',
+ dialect=dialect)
+
+ def test_supports_empty_insert_false(self):
+ table1 = self.tables.mytable
+
+ dialect = default.DefaultDialect()
+ dialect.supports_empty_insert = dialect.supports_default_values = False
+
+ stmt = table1.insert().values({}) # hide from 2to3
+ assert_raises_message(exc.CompileError,
+ "The 'default' dialect with current database version "
+ "settings does not support empty inserts.",
+ stmt.compile, dialect=dialect)
+
+
+class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_not_supported(self):
+ table1 = self.tables.mytable
+
+ dialect = default.DefaultDialect()
+ stmt = table1.insert().values([{'myid': 1}, {'myid': 2}])
+ assert_raises_message(
+ exc.CompileError,
+ "The 'default' dialect with current database version settings "
+ "does not support in-place multirow inserts.",
+ stmt.compile, dialect=dialect)
+
+ def test_named(self):
+ table1 = self.tables.mytable
+
+ values = [
+ {'myid': 1, 'name': 'a', 'description': 'b'},
+ {'myid': 2, 'name': 'c', 'description': 'd'},
+ {'myid': 3, 'name': 'e', 'description': 'f'}
+ ]
+
+ checkparams = {
+ 'myid_0': 1,
+ 'myid_1': 2,
+ 'myid_2': 3,
+ 'name_0': 'a',
+ 'name_1': 'c',
+ 'name_2': 'e',
+ 'description_0': 'b',
+ 'description_1': 'd',
+ 'description_2': 'f',
+ }
+
+ dialect = default.DefaultDialect()
+ dialect.supports_multivalues_insert = True
+
+ self.assert_compile(table1.insert().values(values),
+ 'INSERT INTO mytable (myid, name, description) VALUES '
+ '(:myid_0, :name_0, :description_0), '
+ '(:myid_1, :name_1, :description_1), '
+ '(:myid_2, :name_2, :description_2)',
+ checkparams=checkparams, dialect=dialect)
+
+ def test_positional(self):
+ table1 = self.tables.mytable
+
+ values = [
+ {'myid': 1, 'name': 'a', 'description': 'b'},
+ {'myid': 2, 'name': 'c', 'description': 'd'},
+ {'myid': 3, 'name': 'e', 'description': 'f'}
+ ]
+
+ checkpositional = (1, 'a', 'b', 2, 'c', 'd', 3, 'e', 'f')
+
+ dialect = default.DefaultDialect()
+ dialect.supports_multivalues_insert = True
+ dialect.paramstyle = 'format'
+ dialect.positional = True
+
+ self.assert_compile(table1.insert().values(values),
+ 'INSERT INTO mytable (myid, name, description) VALUES '
+ '(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)',
+ checkpositional=checkpositional, dialect=dialect)
+
+ def test_inline_default(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String),
+ Column('foo', Integer, default=func.foobar()))
+
+ values = [
+ {'id': 1, 'data': 'data1'},
+ {'id': 2, 'data': 'data2', 'foo': 'plainfoo'},
+ {'id': 3, 'data': 'data3'},
+ ]
+
+ checkparams = {
+ 'id_0': 1,
+ 'id_1': 2,
+ 'id_2': 3,
+ 'data_0': 'data1',
+ 'data_1': 'data2',
+ 'data_2': 'data3',
+ 'foo_1': 'plainfoo',
+ }
+
+ self.assert_compile(table.insert().values(values),
+ 'INSERT INTO sometable (id, data, foo) VALUES '
+ '(%(id_0)s, %(data_0)s, foobar()), '
+ '(%(id_1)s, %(data_1)s, %(foo_1)s), '
+ '(%(id_2)s, %(data_2)s, foobar())',
+ checkparams=checkparams, dialect=postgresql.dialect())
+
+ def test_server_default(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String),
+ Column('foo', Integer, server_default=func.foobar()))
+
+ values = [
+ {'id': 1, 'data': 'data1'},
+ {'id': 2, 'data': 'data2', 'foo': 'plainfoo'},
+ {'id': 3, 'data': 'data3'},
+ ]
+
+ checkparams = {
+ 'id_0': 1,
+ 'id_1': 2,
+ 'id_2': 3,
+ 'data_0': 'data1',
+ 'data_1': 'data2',
+ 'data_2': 'data3',
+ }
+
+ self.assert_compile(table.insert().values(values),
+ 'INSERT INTO sometable (id, data) VALUES '
+ '(%(id_0)s, %(data_0)s), '
+ '(%(id_1)s, %(data_1)s), '
+ '(%(id_2)s, %(data_2)s)',
+ checkparams=checkparams, dialect=postgresql.dialect())
+
+ def test_server_default_absent_value(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String),
+ Column('foo', Integer, server_default=func.foobar()))
+
+ values = [
+ {'id': 1, 'data': 'data1', 'foo': 'plainfoo'},
+ {'id': 2, 'data': 'data2'},
+ {'id': 3, 'data': 'data3', 'foo': 'otherfoo'},
+ ]
+
+ checkparams = {
+ 'id_0': 1,
+ 'id_1': 2,
+ 'id_2': 3,
+ 'data_0': 'data1',
+ 'data_1': 'data2',
+ 'data_2': 'data3',
+ 'foo_0': 'plainfoo',
+ 'foo_2': 'otherfoo',
+ }
+
+ # note the effect here is that the first set of params
+ # takes effect for the rest of them, when one is absent
+ self.assert_compile(table.insert().values(values),
+ 'INSERT INTO sometable (id, data, foo) VALUES '
+ '(%(id_0)s, %(data_0)s, %(foo_0)s), '
+ '(%(id_1)s, %(data_1)s, %(foo_0)s), '
+ '(%(id_2)s, %(data_2)s, %(foo_2)s)',
+ checkparams=checkparams, dialect=postgresql.dialect())
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index d7cb8db4a..fd45d303f 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -1,19 +1,15 @@
-
-from sqlalchemy import exc as exceptions
-from sqlalchemy import testing
-from sqlalchemy.testing import engines
-from sqlalchemy import select, MetaData, Integer, or_
+from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_
from sqlalchemy.engine import default
from sqlalchemy.sql import table, column
-from sqlalchemy.testing import assert_raises, eq_
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
-from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.testing import AssertsCompiledSQL, assert_raises, engines,\
+ fixtures
from sqlalchemy.testing.schema import Table, Column
IDENT_LENGTH = 29
class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'DefaultDialect'
table1 = table('some_large_named_table',
column('this_is_the_primarykey_column'),
@@ -25,9 +21,6 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
column('this_is_the_data_column')
)
- __dialect__ = 'DefaultDialect'
-
-
def _length_fixture(self, length=IDENT_LENGTH, positional=False):
dialect = default.DefaultDialect()
dialect.max_identifier_length = length
@@ -60,7 +53,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
ta = table2.alias()
on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column
self.assert_compile(
- select([table1, ta]).select_from(table1.join(ta, on)).\
+ select([table1, ta]).select_from(table1.join(ta, on)).
where(ta.c.this_is_the_data_column == 'data3'),
'SELECT '
'some_large_named_table.this_is_the_primarykey_column, '
@@ -87,16 +80,9 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
t = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
m, Column('foo', Integer))
eng = self._engine_fixture()
- for meth in (
- t.create,
- t.drop,
- m.create_all,
- m.drop_all
- ):
- assert_raises(
- exceptions.IdentifierError,
- meth, eng
- )
+ methods = (t.create, t.drop, m.create_all, m.drop_all)
+ for meth in methods:
+ assert_raises(exceptions.IdentifierError, meth, eng)
def _assert_labeled_table1_select(self, s):
table1 = self.table1
@@ -263,7 +249,9 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=self._length_fixture(positional=True)
)
+
class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'DefaultDialect'
table1 = table('some_large_named_table',
column('this_is_the_primarykey_column'),
@@ -275,8 +263,6 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
column('this_is_the_data_column')
)
- __dialect__ = 'DefaultDialect'
-
def test_adjustable_1(self):
table1 = self.table1
q = table1.select(
@@ -404,27 +390,27 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
'AS _1',
dialect=compile_dialect)
-
def test_adjustable_result_schema_column_1(self):
table1 = self.table1
+
q = table1.select(
table1.c.this_is_the_primarykey_column == 4).apply_labels().\
alias('foo')
- dialect = default.DefaultDialect(label_length=10)
+ dialect = default.DefaultDialect(label_length=10)
compiled = q.compile(dialect=dialect)
+
assert set(compiled.result_map['some_2'][1]).issuperset([
- table1.c.this_is_the_data_column,
- 'some_large_named_table_this_is_the_data_column',
- 'some_2'
+ table1.c.this_is_the_data_column,
+ 'some_large_named_table_this_is_the_data_column',
+ 'some_2'
+ ])
- ])
assert set(compiled.result_map['some_1'][1]).issuperset([
- table1.c.this_is_the_primarykey_column,
- 'some_large_named_table_this_is_the_primarykey_column',
- 'some_1'
-
- ])
+ table1.c.this_is_the_primarykey_column,
+ 'some_large_named_table_this_is_the_primarykey_column',
+ 'some_1'
+ ])
def test_adjustable_result_schema_column_2(self):
table1 = self.table1
@@ -434,20 +420,17 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
x = select([q])
dialect = default.DefaultDialect(label_length=10)
-
compiled = x.compile(dialect=dialect)
+
assert set(compiled.result_map['this_2'][1]).issuperset([
- q.corresponding_column(table1.c.this_is_the_data_column),
- 'this_is_the_data_column',
- 'this_2'
+ q.corresponding_column(table1.c.this_is_the_data_column),
+ 'this_is_the_data_column',
+ 'this_2'])
- ])
assert set(compiled.result_map['this_1'][1]).issuperset([
- q.corresponding_column(table1.c.this_is_the_primarykey_column),
- 'this_is_the_primarykey_column',
- 'this_1'
-
- ])
+ q.corresponding_column(table1.c.this_is_the_primarykey_column),
+ 'this_is_the_primarykey_column',
+ 'this_1'])
def test_table_plus_column_exceeds_length(self):
"""test that the truncation only occurs when tablename + colname are
@@ -490,7 +473,6 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
'other_thirty_characters_table_.thirty_characters_table_id',
dialect=compile_dialect)
-
def test_colnames_longer_than_labels_lowercase(self):
t1 = table('a', column('abcde'))
self._test_colnames_longer_than_labels(t1)
@@ -507,30 +489,18 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
# 'abcde' is longer than 4, but rendered as itself
# needs to have all characters
s = select([a1])
- self.assert_compile(
- select([a1]),
- "SELECT asdf.abcde FROM a AS asdf",
- dialect=dialect
- )
+ self.assert_compile(select([a1]),
+ 'SELECT asdf.abcde FROM a AS asdf',
+ dialect=dialect)
compiled = s.compile(dialect=dialect)
assert set(compiled.result_map['abcde'][1]).issuperset([
- 'abcde',
- a1.c.abcde,
- 'abcde'
- ])
+ 'abcde', a1.c.abcde, 'abcde'])
# column still there, but short label
s = select([a1]).apply_labels()
- self.assert_compile(
- s,
- "SELECT asdf.abcde AS _1 FROM a AS asdf",
- dialect=dialect
- )
+ self.assert_compile(s,
+ 'SELECT asdf.abcde AS _1 FROM a AS asdf',
+ dialect=dialect)
compiled = s.compile(dialect=dialect)
assert set(compiled.result_map['_1'][1]).issuperset([
- 'asdf_abcde',
- a1.c.abcde,
- '_1'
- ])
-
-
+ 'asdf_abcde', a1.c.abcde, '_1'])
diff --git a/test/sql/test_update.py b/test/sql/test_update.py
index b46489cd2..a8df86cd2 100644
--- a/test/sql/test_update.py
+++ b/test/sql/test_update.py
@@ -1,55 +1,53 @@
-from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
-import datetime
from sqlalchemy import *
-from sqlalchemy import exc, sql, util
-from sqlalchemy.engine import default, base
from sqlalchemy import testing
-from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.dialects import mysql
+from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures
+from sqlalchemy.testing.schema import Table, Column
+
class _UpdateFromTestBase(object):
@classmethod
def define_tables(cls, metadata):
+ Table('mytable', metadata,
+ Column('myid', Integer),
+ Column('name', String(30)),
+ Column('description', String(50)))
+ Table('myothertable', metadata,
+ Column('otherid', Integer),
+ Column('othername', String(30)))
Table('users', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(30), nullable=False),
- )
-
+ test_needs_autoincrement=True),
+ Column('name', String(30), nullable=False))
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('name', String(30), nullable=False),
- Column('email_address', String(50), nullable=False),
- )
-
- Table("dingalings", metadata,
+ Column('email_address', String(50), nullable=False))
+ Table('dingalings', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('address_id', None, ForeignKey('addresses.id')),
- Column('data', String(30)),
- )
+ Column('data', String(30)))
@classmethod
def fixtures(cls):
return dict(
- users = (
+ users=(
('id', 'name'),
(7, 'jack'),
(8, 'ed'),
(9, 'fred'),
(10, 'chuck')
),
-
addresses = (
('id', 'user_id', 'name', 'email_address'),
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed@wood.com"),
- (3, 8, 'x', "ed@bettyboop.com"),
- (4, 8, 'x', "ed@lala.com"),
- (5, 9, 'x', "fred@fred.com")
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed@wood.com'),
+ (3, 8, 'x', 'ed@bettyboop.com'),
+ (4, 8, 'x', 'ed@lala.com'),
+ (5, 9, 'x', 'fred@fred.com')
),
dingalings = (
('id', 'address_id', 'data'),
@@ -59,288 +57,462 @@ class _UpdateFromTestBase(object):
)
-class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_update_1(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1, table1.c.myid == 7),
+ 'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
+ params={table1.c.name: 'fred'})
+
+ def test_update_2(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ table1.update().
+ where(table1.c.myid == 7).
+ values({table1.c.myid: 5}),
+ 'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
+ checkparams={'myid': 5, 'myid_1': 7})
+
+ def test_update_3(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1, table1.c.myid == 7),
+ 'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
+ params={'name': 'fred'})
+
+ def test_update_4(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1, values={table1.c.name: table1.c.myid}),
+ 'UPDATE mytable SET name=mytable.myid')
+
+ def test_update_5(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1,
+ whereclause=table1.c.name == bindparam('crit'),
+ values={table1.c.name: 'hi'}),
+ 'UPDATE mytable SET name=:name WHERE mytable.name = :crit',
+ params={'crit': 'notthere'},
+ checkparams={'crit': 'notthere', 'name': 'hi'})
+
+ def test_update_6(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1,
+ table1.c.myid == 12,
+ values={table1.c.name: table1.c.myid}),
+ 'UPDATE mytable '
+ 'SET name=mytable.myid, description=:description '
+ 'WHERE mytable.myid = :myid_1',
+ params={'description': 'test'},
+ checkparams={'description': 'test', 'myid_1': 12})
+
+ def test_update_7(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
+ 'UPDATE mytable '
+ 'SET myid=:myid, description=:description '
+ 'WHERE mytable.myid = :myid_1',
+ params={'myid_1': 12, 'myid': 9, 'description': 'test'})
+
+ def test_update_8(self):
+ table1 = self.tables.mytable
+
+ self.assert_compile(
+ update(table1, table1.c.myid == 12),
+ 'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
+ params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
+
+ def test_update_9(self):
+ table1 = self.tables.mytable
+
+ s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
+ c = s.compile(column_keys=['id', 'name'])
+ eq_(str(s), str(c))
+
+ def test_update_10(self):
+ table1 = self.tables.mytable
+
+ v1 = {table1.c.name: table1.c.myid}
+ v2 = {table1.c.name: table1.c.name + 'foo'}
+ self.assert_compile(
+ update(table1, table1.c.myid == 12, values=v1).values(v2),
+ 'UPDATE mytable '
+ 'SET '
+ 'name=(mytable.name || :name_1), '
+ 'description=:description '
+ 'WHERE mytable.myid = :myid_1',
+ params={'description': 'test'})
+
+ def test_update_11(self):
+ table1 = self.tables.mytable
+
+ values = {
+ table1.c.name: table1.c.name + 'lala',
+ table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
+ }
+ self.assert_compile(update(table1,
+ (table1.c.myid == func.hoho(4)) &
+ (table1.c.name == literal('foo') +
+ table1.c.name + literal('lala')),
+ values=values),
+ 'UPDATE mytable '
+ 'SET '
+ 'myid=do_stuff(mytable.myid, :param_1), '
+ 'name=(mytable.name || :name_1) '
+ 'WHERE '
+ 'mytable.myid = hoho(:hoho_1) AND '
+ 'mytable.name = :param_2 || mytable.name || :param_3')
+
+ def test_prefix_with(self):
+ table1 = self.tables.mytable
+
+ stmt = table1.update().\
+ prefix_with('A', 'B', dialect='mysql').\
+ prefix_with('C', 'D')
+
+ self.assert_compile(stmt,
+ 'UPDATE C D mytable SET myid=:myid, name=:name, '
+ 'description=:description')
+
+ self.assert_compile(stmt,
+ 'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s',
+ dialect=mysql.dialect())
+
+ def test_alias(self):
+ table1 = self.tables.mytable
+ talias1 = table1.alias('t1')
+
+ self.assert_compile(update(talias1, talias1.c.myid == 7),
+ 'UPDATE mytable AS t1 '
+ 'SET name=:name '
+ 'WHERE t1.myid = :myid_1',
+ params={table1.c.name: 'fred'})
+
+ self.assert_compile(update(talias1, table1.c.myid == 7),
+ 'UPDATE mytable AS t1 '
+ 'SET name=:name '
+ 'FROM mytable '
+ 'WHERE mytable.myid = :myid_1',
+ params={table1.c.name: 'fred'})
+
+ def test_update_to_expression(self):
+ """test update from an expression.
+
+ this logic is triggered currently by a left side that doesn't
+ have a key. The current supported use case is updating the index
+ of a Postgresql ARRAY type.
+
+ """
+ table1 = self.tables.mytable
+ expr = func.foo(table1.c.myid)
+ assert not hasattr(expr, 'key')
+ self.assert_compile(table1.update().values({expr: 'bar'}),
+ 'UPDATE mytable SET foo(myid)=:param_1')
+
+
+class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
+ AssertsCompiledSQL):
__dialect__ = 'default'
run_create_tables = run_inserts = run_deletes = None
def test_render_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==addresses.c.user_id).\
- where(addresses.c.email_address=='e1'),
- "UPDATE users SET name=:name FROM addresses "
- "WHERE users.id = addresses.user_id AND "
- "addresses.email_address = :email_address_1",
- checkparams={u'email_address_1': 'e1', 'name': 'newname'}
- )
+ users.update().
+ values(name='newname').
+ where(users.c.id == addresses.c.user_id).
+ where(addresses.c.email_address == 'e1'),
+ 'UPDATE users '
+ 'SET name=:name FROM addresses '
+ 'WHERE '
+ 'users.id = addresses.user_id AND '
+ 'addresses.email_address = :email_address_1',
+ checkparams={u'email_address_1': 'e1', 'name': 'newname'})
def test_render_multi_table(self):
- users, addresses, dingalings = \
- self.tables.users, \
- self.tables.addresses, \
- self.tables.dingalings
+ users = self.tables.users
+ addresses = self.tables.addresses
+ dingalings = self.tables.dingalings
+
+ checkparams = {
+ u'email_address_1': 'e1',
+ u'id_1': 2,
+ 'name': 'newname'
+ }
+
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==addresses.c.user_id).\
- where(addresses.c.email_address=='e1').\
- where(addresses.c.id==dingalings.c.address_id).\
- where(dingalings.c.id==2),
- "UPDATE users SET name=:name FROM addresses, "
- "dingalings WHERE users.id = addresses.user_id "
- "AND addresses.email_address = :email_address_1 "
- "AND addresses.id = dingalings.address_id AND "
- "dingalings.id = :id_1",
- checkparams={u'email_address_1': 'e1', u'id_1': 2,
- 'name': 'newname'}
- )
+ users.update().
+ values(name='newname').
+ where(users.c.id == addresses.c.user_id).
+ where(addresses.c.email_address == 'e1').
+ where(addresses.c.id == dingalings.c.address_id).
+ where(dingalings.c.id == 2),
+ 'UPDATE users '
+ 'SET name=:name '
+ 'FROM addresses, dingalings '
+ 'WHERE '
+ 'users.id = addresses.user_id AND '
+ 'addresses.email_address = :email_address_1 AND '
+ 'addresses.id = dingalings.address_id AND '
+ 'dingalings.id = :id_1',
+ checkparams=checkparams)
def test_render_table_mysql(self):
users, addresses = self.tables.users, self.tables.addresses
+
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==addresses.c.user_id).\
- where(addresses.c.email_address=='e1'),
- "UPDATE users, addresses SET users.name=%s "
- "WHERE users.id = addresses.user_id AND "
- "addresses.email_address = %s",
+ users.update().
+ values(name='newname').
+ where(users.c.id == addresses.c.user_id).
+ where(addresses.c.email_address == 'e1'),
+ 'UPDATE users, addresses '
+ 'SET users.name=%s '
+ 'WHERE '
+ 'users.id = addresses.user_id AND '
+ 'addresses.email_address = %s',
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
- dialect=mysql.dialect()
- )
+ dialect=mysql.dialect())
def test_render_subquery(self):
users, addresses = self.tables.users, self.tables.addresses
- subq = select([addresses.c.id,
- addresses.c.user_id,
- addresses.c.email_address]).\
- where(addresses.c.id==7).alias()
+
+ checkparams = {
+ u'email_address_1': 'e1',
+ u'id_1': 7,
+ 'name': 'newname'
+ }
+
+ cols = [
+ addresses.c.id,
+ addresses.c.user_id,
+ addresses.c.email_address
+ ]
+
+ subq = select(cols).where(addresses.c.id == 7).alias()
self.assert_compile(
- users.update().\
- values(name='newname').\
- where(users.c.id==subq.c.user_id).\
- where(subq.c.email_address=='e1'),
- "UPDATE users SET name=:name FROM "
- "(SELECT addresses.id AS id, addresses.user_id "
- "AS user_id, addresses.email_address AS "
- "email_address FROM addresses WHERE addresses.id = "
- ":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
- "AND anon_1.email_address = :email_address_1",
- checkparams={u'email_address_1': 'e1',
- u'id_1': 7, 'name': 'newname'}
- )
+ users.update().
+ values(name='newname').
+ where(users.c.id == subq.c.user_id).
+ where(subq.c.email_address == 'e1'),
+ 'UPDATE users '
+ 'SET name=:name FROM ('
+ 'SELECT '
+ 'addresses.id AS id, '
+ 'addresses.user_id AS user_id, '
+ 'addresses.email_address AS email_address '
+ 'FROM addresses '
+ 'WHERE addresses.id = :id_1'
+ ') AS anon_1 '
+ 'WHERE users.id = anon_1.user_id '
+ 'AND anon_1.email_address = :email_address_1',
+ checkparams=checkparams)
+
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
@testing.requires.update_from
def test_exec_two_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
testing.db.execute(
- addresses.update().\
- values(email_address=users.c.name).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- testing.db.execute(
- addresses.select().\
- order_by(addresses.c.id)).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed"),
- (4, 8, 'x', "ed"),
- (5, 9, 'x', "fred@fred.com")
- ]
- )
+ addresses.update().
+ values(email_address=users.c.name).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed'),
+ (4, 8, 'x', 'ed'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
@testing.requires.update_from
def test_exec_two_table_plus_alias(self):
users, addresses = self.tables.users, self.tables.addresses
- a1 = addresses.alias()
+ a1 = addresses.alias()
testing.db.execute(
- addresses.update().\
- values(email_address=users.c.name).\
- where(users.c.id==a1.c.user_id).\
- where(users.c.name=='ed').\
- where(a1.c.id==addresses.c.id)
- )
- eq_(
- testing.db.execute(
- addresses.select().\
- order_by(addresses.c.id)).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed"),
- (4, 8, 'x', "ed"),
- (5, 9, 'x', "fred@fred.com")
- ]
+ addresses.update().
+ values(email_address=users.c.name).
+ where(users.c.id == a1.c.user_id).
+ where(users.c.name == 'ed').
+ where(a1.c.id == addresses.c.id)
)
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed'),
+ (4, 8, 'x', 'ed'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
@testing.requires.update_from
def test_exec_three_table(self):
- users, addresses, dingalings = \
- self.tables.users, \
- self.tables.addresses, \
- self.tables.dingalings
+ users = self.tables.users
+ addresses = self.tables.addresses
+ dingalings = self.tables.dingalings
+
testing.db.execute(
- addresses.update().\
- values(email_address=users.c.name).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed').
- where(addresses.c.id==dingalings.c.address_id).\
- where(dingalings.c.id==1),
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)
- ).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed@bettyboop.com"),
- (4, 8, 'x', "ed@lala.com"),
- (5, 9, 'x', "fred@fred.com")
- ]
- )
+ addresses.update().
+ values(email_address=users.c.name).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed').
+ where(addresses.c.id == dingalings.c.address_id).
+ where(dingalings.c.id == 1))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed@bettyboop.com'),
+ (4, 8, 'x', 'ed@lala.com'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
@testing.only_on('mysql', 'Multi table update')
def test_exec_multitable(self):
users, addresses = self.tables.users, self.tables.addresses
+
+ values = {
+ addresses.c.email_address: users.c.name,
+ users.c.name: 'ed2'
+ }
+
testing.db.execute(
- addresses.update().\
- values({
- addresses.c.email_address:users.c.name,
- users.c.name:'ed2'
- }).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)).fetchall(),
- [
- (1, 7, 'x', "jack@bean.com"),
- (2, 8, 'x', "ed"),
- (3, 8, 'x', "ed"),
- (4, 8, 'x', "ed"),
- (5, 9, 'x', "fred@fred.com")
- ]
- )
- eq_(
- testing.db.execute(
- users.select().order_by(users.c.id)).fetchall(),
- [
- (7, 'jack'),
- (8, 'ed2'),
- (9, 'fred'),
- (10, 'chuck')
- ]
- )
+ addresses.update().
+ values(values).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'x', 'ed'),
+ (3, 8, 'x', 'ed'),
+ (4, 8, 'x', 'ed'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ expected = [
+ (7, 'jack'),
+ (8, 'ed2'),
+ (9, 'fred'),
+ (10, 'chuck')]
+ self._assert_users(users, expected)
-class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
+ def _assert_addresses(self, addresses, expected):
+ stmt = addresses.select().order_by(addresses.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
+ def _assert_users(self, users, expected):
+ stmt = users.select().order_by(users.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
+
+class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
+ fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('name', String(30), nullable=False),
- Column('some_update', String(30), onupdate="im the update")
- )
+ Column('some_update', String(30), onupdate='im the update'))
Table('addresses', metadata,
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
- Column('email_address', String(50), nullable=False),
- )
+ Column('email_address', String(50), nullable=False))
@classmethod
def fixtures(cls):
return dict(
- users = (
+ users=(
('id', 'name', 'some_update'),
(8, 'ed', 'value'),
(9, 'fred', 'value'),
),
-
- addresses = (
+ addresses=(
('id', 'user_id', 'email_address'),
- (2, 8, "ed@wood.com"),
- (3, 8, "ed@bettyboop.com"),
- (4, 9, "fred@fred.com")
+ (2, 8, 'ed@wood.com'),
+ (3, 8, 'ed@bettyboop.com'),
+ (4, 9, 'fred@fred.com')
),
)
@testing.only_on('mysql', 'Multi table update')
def test_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
+ values = {
+ addresses.c.email_address: users.c.name,
+ users.c.name: 'ed2'
+ }
+
ret = testing.db.execute(
- addresses.update().\
- values({
- addresses.c.email_address:users.c.name,
- users.c.name:'ed2'
- }).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- set(ret.prefetch_cols()),
- set([users.c.some_update])
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)).fetchall(),
- [
- (2, 8, "ed"),
- (3, 8, "ed"),
- (4, 9, "fred@fred.com")
- ]
- )
- eq_(
- testing.db.execute(
- users.select().order_by(users.c.id)).fetchall(),
- [
- (8, 'ed2', 'im the update'),
- (9, 'fred', 'value'),
- ]
- )
+ addresses.update().
+ values(values).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ eq_(set(ret.prefetch_cols()), set([users.c.some_update]))
+
+ expected = [
+ (2, 8, 'ed'),
+ (3, 8, 'ed'),
+ (4, 9, 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ expected = [
+ (8, 'ed2', 'im the update'),
+ (9, 'fred', 'value')]
+ self._assert_users(users, expected)
@testing.only_on('mysql', 'Multi table update')
def test_no_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
+
ret = testing.db.execute(
- addresses.update().\
- values({
- 'email_address':users.c.name,
- }).\
- where(users.c.id==addresses.c.user_id).\
- where(users.c.name=='ed')
- )
- eq_(
- ret.prefetch_cols(),[]
- )
- eq_(
- testing.db.execute(
- addresses.select().order_by(addresses.c.id)).fetchall(),
- [
- (2, 8, "ed"),
- (3, 8, "ed"),
- (4, 9, "fred@fred.com")
- ]
- )
- # users table not actually updated,
- # so no onupdate
- eq_(
- testing.db.execute(
- users.select().order_by(users.c.id)).fetchall(),
- [
- (8, 'ed', 'value'),
- (9, 'fred', 'value'),
- ]
- )
+ addresses.update().
+ values({'email_address': users.c.name}).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ eq_(ret.prefetch_cols(), [])
+
+ expected = [
+ (2, 8, 'ed'),
+ (3, 8, 'ed'),
+ (4, 9, 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ # users table not actually updated, so no onupdate
+ expected = [
+ (8, 'ed', 'value'),
+ (9, 'fred', 'value')]
+ self._assert_users(users, expected)
+
+ def _assert_addresses(self, addresses, expected):
+ stmt = addresses.select().order_by(addresses.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
+ def _assert_users(self, users, expected):
+ stmt = users.select().order_by(users.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)