diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-07-25 16:04:35 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-07-25 16:04:35 -0400 |
commit | 31178db91455ead5bfd4269658073c745e090569 (patch) | |
tree | 1db071c0e830d28e03e5ca55d07a7e2131b5935a | |
parent | 6b60d3a9e6ba93d177ac777bfaae8269c18ddee6 (diff) | |
download | sqlalchemy-31178db91455ead5bfd4269658073c745e090569.tar.gz |
- flake8 all of test/dialect/postgresql
- add __backend__ to most tests so that pg8000 can start coming in
-rw-r--r-- | test/dialect/postgresql/test_compiler.py | 285 | ||||
-rw-r--r-- | test/dialect/postgresql/test_dialect.py | 28 | ||||
-rw-r--r-- | test/dialect/postgresql/test_query.py | 454 | ||||
-rw-r--r-- | test/dialect/postgresql/test_reflection.py | 315 | ||||
-rw-r--r-- | test/dialect/postgresql/test_types.py | 671 |
5 files changed, 995 insertions, 758 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index 76166b6dd..c71852d90 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -1,15 +1,11 @@ # coding: utf-8 -from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, assert_raises +from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \ + assert_raises from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing -import datetime -from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ - String, Sequence, ForeignKey, join, Numeric, \ - PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ - func, literal_column, literal, bindparam, cast, extract, \ - SmallInteger, Enum, REAL, update, insert, Index, delete, \ - and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy import Sequence, Table, Column, Integer, update, String,\ + insert, func, MetaData, Enum, Index, and_, delete, select, cast from sqlalchemy.dialects.postgresql import ExcludeConstraint, array from sqlalchemy import exc, schema from sqlalchemy.dialects.postgresql import base as postgresql @@ -18,6 +14,7 @@ from sqlalchemy.orm import mapper, aliased, Session from sqlalchemy.sql import table, column, operators from sqlalchemy.util import u + class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): __prefer__ = 'postgresql' @@ -45,24 +42,34 @@ class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): ('tb4', 'abc'), ]: t = Table(tname[:57], - metadata, - Column(cname[:57], Integer, primary_key=True) - ) + metadata, + Column(cname[:57], Integer, primary_key=True) + ) t.create(engine) r = engine.execute(t.insert()) assert r.inserted_primary_key == [1] + class CompileTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = postgresql.dialect() def test_update_returning(self): dialect = postgresql.dialect() - table1 = table('mytable', column('myid', Integer), column('name' - , String(128)), column('description', - String(128))) - u = update(table1, values=dict(name='foo' - )).returning(table1.c.myid, table1.c.name) + table1 = table( + 'mytable', + column( + 'myid', Integer), + column( + 'name', String(128)), + column( + 'description', String(128))) + u = update( + table1, + values=dict( + name='foo')).returning( + table1.c.myid, + table1.c.name) self.assert_compile(u, 'UPDATE mytable SET name=%(name)s ' 'RETURNING mytable.myid, mytable.name', @@ -73,23 +80,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'RETURNING mytable.myid, mytable.name, ' 'mytable.description', dialect=dialect) u = update(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(u, - 'UPDATE mytable SET name=%(name)s ' - 'RETURNING length(mytable.name) AS length_1' - , dialect=dialect) - + )).returning(func.length(table1.c.name)) + self.assert_compile( + u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING length(mytable.name) AS length_1', + dialect=dialect) def test_insert_returning(self): dialect = postgresql.dialect() table1 = table('mytable', - column('myid', Integer), - column('name', String(128)), - column('description', String(128)), - ) - - i = insert(table1, values=dict(name='foo' - )).returning(table1.c.myid, table1.c.name) + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + i = insert( + table1, + values=dict( + name='foo')).returning( + table1.c.myid, + table1.c.name) self.assert_compile(i, 'INSERT INTO mytable (name) VALUES ' '(%(name)s) RETURNING mytable.myid, ' @@ -101,27 +112,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'mytable.name, mytable.description', dialect=dialect) i = insert(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) + )).returning(func.length(table1.c.name)) self.assert_compile(i, 'INSERT INTO mytable (name) VALUES ' '(%(name)s) RETURNING length(mytable.name) ' 'AS length_1', dialect=dialect) - def test_create_drop_enum(self): # test escaping and unicode within CREATE TYPE for ENUM typ = postgresql.ENUM( - "val1", "val2", "val's 3", u('méil'), name="myname") - self.assert_compile(postgresql.CreateEnumType(typ), - u("CREATE TYPE myname AS ENUM ('val1', 'val2', 'val''s 3', 'méil')") - ) + "val1", "val2", "val's 3", u('méil'), name="myname") + self.assert_compile( + postgresql.CreateEnumType(typ), + u("CREATE TYPE myname AS " + "ENUM ('val1', 'val2', 'val''s 3', 'méil')")) typ = postgresql.ENUM( - "val1", "val2", "val's 3", name="PleaseQuoteMe") + "val1", "val2", "val's 3", name="PleaseQuoteMe") self.assert_compile(postgresql.CreateEnumType(typ), - "CREATE TYPE \"PleaseQuoteMe\" AS ENUM " - "('val1', 'val2', 'val''s 3')" - ) + "CREATE TYPE \"PleaseQuoteMe\" AS ENUM " + "('val1', 'val2', 'val''s 3')" + ) def test_generic_enum(self): e1 = Enum('x', 'y', 'z', name='somename') @@ -140,8 +151,16 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(schema.CreateTable(t1), 'CREATE TABLE sometable (somecolumn ' 'somename)') - t1 = Table('sometable', MetaData(), Column('somecolumn', - Enum('x', 'y', 'z', native_enum=False))) + t1 = Table( + 'sometable', + MetaData(), + Column( + 'somecolumn', + Enum( + 'x', + 'y', + 'z', + native_enum=False))) self.assert_compile(schema.CreateTable(t1), "CREATE TABLE sometable (somecolumn " "VARCHAR(1), CHECK (somecolumn IN ('x', " @@ -152,16 +171,16 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): tbl = Table('testtbl', m, Column('data', Integer)) idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data - < 10)) + < 10)) idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data - < 10)) + < 10)) # test quoting and all that idx2 = Index('test_idx2', tbl.c.data, postgresql_where=and_(tbl.c.data > 'a', tbl.c.data - < "b's")) + < "b's")) self.assert_compile(schema.CreateIndex(idx), 'CREATE INDEX test_idx1 ON testtbl (data) ' 'WHERE data > 5 AND data < 10', @@ -181,8 +200,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): postgresql_ops={'data': 'text_pattern_ops'}) idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2, - postgresql_ops={'data': 'text_pattern_ops', - 'd2': 'int4_ops'}) + postgresql_ops={'data': 'text_pattern_ops', + 'd2': 'int4_ops'}) self.assert_compile(schema.CreateIndex(idx), 'CREATE INDEX test_idx1 ON testtbl ' @@ -214,7 +233,6 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'USING hash (data)', dialect=postgresql.dialect()) - def test_create_index_expr_gets_parens(self): m = MetaData() tbl = Table('testtbl', m, Column('x', Integer), Column('y', Integer)) @@ -271,8 +289,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): m = MetaData() cons = ExcludeConstraint(('room', '=')) tbl = Table('testtbl', m, - Column('room', Integer, primary_key=True), - cons) + Column('room', Integer, primary_key=True), + cons) # apparently you can't copy a ColumnCollectionConstraint until # after it has been bound to a table... cons_copy = cons.copy() @@ -289,10 +307,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(func.substring('abc', 1), 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)') - def test_for_update(self): table1 = table('mytable', - column('myid'), column('name'), column('description')) + column('myid'), column('name'), column('description')) self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(), @@ -311,35 +328,35 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True), + with_for_update(read=True, nowait=True), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT") self.assert_compile( table1.select(table1.c.myid == 7). - with_for_update(of=table1.c.myid), + with_for_update(of=table1.c.myid), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " "FOR UPDATE OF mytable") self.assert_compile( table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True, of=table1), + with_for_update(read=True, nowait=True, of=table1), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " "FOR SHARE OF mytable NOWAIT") self.assert_compile( table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True, of=table1.c.myid), + with_for_update(read=True, nowait=True, of=table1.c.myid), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " "FOR SHARE OF mytable NOWAIT") self.assert_compile( table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True, - of=[table1.c.myid, table1.c.name]), + with_for_update(read=True, nowait=True, + of=[table1.c.myid, table1.c.name]), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " "FOR SHARE OF mytable NOWAIT") @@ -347,20 +364,20 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ta = table1.alias() self.assert_compile( ta.select(ta.c.myid == 7). - with_for_update(of=[ta.c.myid, ta.c.name]), + with_for_update(of=[ta.c.myid, ta.c.name]), "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " "FROM mytable AS mytable_1 " "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1" ) - def test_reserved_words(self): table = Table("pg_table", MetaData(), - Column("col1", Integer), - Column("variadic", Integer)) + Column("col1", Integer), + Column("variadic", Integer)) x = select([table.c.col1, table.c.variadic]) - self.assert_compile(x, + self.assert_compile( + x, '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''') def test_array(self): @@ -384,12 +401,12 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( c[5:7][2:3], "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", - checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3} + checkparams={'x_2': 7, 'x_1': 5, 'param_1': 2, 'param_2': 3} ) self.assert_compile( c[5:7][3], "x[%(x_1)s:%(x_2)s][%(param_1)s]", - checkparams={'x_2': 7, 'x_1': 5, 'param_1':3} + checkparams={'x_2': 7, 'x_1': 5, 'param_1': 3} ) self.assert_compile( @@ -452,13 +469,13 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): c[5:7][2:3], "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", checkparams={'x_2': 7 + add_one, 'x_1': 5 + add_one, - 'param_1': 2 + add_one, 'param_2': 3 + add_one} + 'param_1': 2 + add_one, 'param_2': 3 + add_one} ) self.assert_compile( c[5:7][3], "x[%(x_1)s:%(x_2)s][%(param_1)s]", checkparams={'x_2': 7 + add_one, 'x_1': 5 + add_one, - 'param_1': 3 + add_one} + 'param_1': 3 + add_one} ) def test_array_zero_indexes_true(self): @@ -472,16 +489,16 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer) is_(postgresql.array([1, 2], type_=String). - type.item_type._type_affinity, String) + type.item_type._type_affinity, String) def test_array_literal(self): self.assert_compile( func.array_dims(postgresql.array([1, 2]) + - postgresql.array([3, 4, 5])), + postgresql.array([3, 4, 5])), "array_dims(ARRAY[%(param_1)s, %(param_2)s] || " - "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", + "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1, - 'param_3': 3, 'param_2': 2} + 'param_3': 3, 'param_2': 2} ) def test_array_literal_insert(self): @@ -490,7 +507,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( t.insert().values(data=array([1, 2, 3])), "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " - "%(param_2)s, %(param_3)s])" + "%(param_2)s, %(param_3)s])" ) def test_update_array_element(self): @@ -548,13 +565,15 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): expected = 'UPDATE ONLY testtbl1 SET id=%(id)s' self.assert_compile(stmt, expected) - stmt = delete(tbl1).with_hint('ONLY', selectable=tbl1, dialect_name='postgresql') + stmt = delete(tbl1).with_hint( + 'ONLY', selectable=tbl1, dialect_name='postgresql') expected = 'DELETE FROM ONLY testtbl1' self.assert_compile(stmt, expected) tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema') stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql') - expected = 'SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3' + expected = 'SELECT testschema.testtbl3.id FROM '\ + 'ONLY testschema.testtbl3' self.assert_compile(stmt, expected) assert_raises( @@ -564,8 +583,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) - class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): + """Test 'DISTINCT' with SQL expression language and orm.Query with an emphasis on PG's 'DISTINCT ON' syntax. @@ -574,10 +593,10 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): def setup(self): self.table = Table('t', MetaData(), - Column('id',Integer, primary_key=True), - Column('a', String), - Column('b', String), - ) + Column('id', Integer, primary_key=True), + Column('a', String), + Column('b', String), + ) def test_plain_generative(self): self.assert_compile( @@ -594,7 +613,7 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): def test_on_columns_generative_multi_call(self): self.assert_compile( select([self.table]).distinct(self.table.c.a). - distinct(self.table.c.b), + distinct(self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t" ) @@ -607,8 +626,8 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): def test_on_columns_inline_list(self): self.assert_compile( select([self.table], - distinct=[self.table.c.a, self.table.c.b]). - order_by(self.table.c.a, self.table.c.b), + distinct=[self.table.c.a, self.table.c.b]). + order_by(self.table.c.a, self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id, " "t.a, t.b FROM t ORDER BY t.a, t.b" ) @@ -639,13 +658,14 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): sess = Session() self.assert_compile( sess.query(self.table).distinct(self.table.c.a). - distinct(self.table.c.b), + distinct(self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t" ) def test_query_on_columns_subquery(self): sess = Session() + class Foo(object): pass mapper(Foo, self.table) @@ -673,44 +693,46 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): def test_distinct_on_subquery_anon(self): sq = select([self.table]).alias() - q = select([self.table.c.id,sq.c.id]).\ - distinct(sq.c.id).\ - where(self.table.c.id==sq.c.id) + q = select([self.table.c.id, sq.c.id]).\ + distinct(sq.c.id).\ + where(self.table.c.id == sq.c.id) self.assert_compile( q, "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id " "FROM t, (SELECT t.id AS id, t.a AS a, t.b " "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id" - ) + ) def test_distinct_on_subquery_named(self): sq = select([self.table]).alias('sq') - q = select([self.table.c.id,sq.c.id]).\ - distinct(sq.c.id).\ - where(self.table.c.id==sq.c.id) + q = select([self.table.c.id, sq.c.id]).\ + distinct(sq.c.id).\ + where(self.table.c.id == sq.c.id) self.assert_compile( q, "SELECT DISTINCT ON (sq.id) t.id, sq.id " "FROM t, (SELECT t.id AS id, t.a AS a, " "t.b AS b FROM t) AS sq WHERE t.id = sq.id" - ) + ) + class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL): + """Tests for full text searching """ __dialect__ = postgresql.dialect() def setup(self): self.table = Table('t', MetaData(), - Column('id', Integer, primary_key=True), - Column('title', String), - Column('body', String), - ) + Column('id', Integer, primary_key=True), + Column('title', String), + Column('body', String), + ) self.table_alt = table('mytable', - column('id', Integer), - column('title', String(128)), - column('body', String(128))) + column('id', Integer), + column('title', String(128)), + column('body', String(128))) def _raise_query(self, q): """ @@ -724,55 +746,50 @@ class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL): s = select([self.table_alt.c.id])\ .where(self.table_alt.c.title.match('somestring')) self.assert_compile(s, - 'SELECT mytable.id ' - 'FROM mytable ' - 'WHERE mytable.title @@ to_tsquery(%(title_1)s)') + 'SELECT mytable.id ' + 'FROM mytable ' + 'WHERE mytable.title @@ to_tsquery(%(title_1)s)') def test_match_regconfig(self): - s = select([self.table_alt.c.id])\ - .where( - self.table_alt.c.title.match('somestring', - postgresql_regconfig='english') - ) - self.assert_compile(s, - 'SELECT mytable.id ' + s = select([self.table_alt.c.id]).where( + self.table_alt.c.title.match( + 'somestring', + postgresql_regconfig='english') + ) + self.assert_compile( + s, 'SELECT mytable.id ' 'FROM mytable ' """WHERE mytable.title @@ to_tsquery('english', %(title_1)s)""") def test_match_tsvector(self): - s = select([self.table_alt.c.id])\ - .where( - func.to_tsvector( self.table_alt.c.title )\ - .match('somestring') - ) - self.assert_compile(s, - 'SELECT mytable.id ' + s = select([self.table_alt.c.id]).where( + func.to_tsvector(self.table_alt.c.title) + .match('somestring') + ) + self.assert_compile( + s, 'SELECT mytable.id ' 'FROM mytable ' - 'WHERE to_tsvector(mytable.title) @@ to_tsquery(%(to_tsvector_1)s)') + 'WHERE to_tsvector(mytable.title) ' + '@@ to_tsquery(%(to_tsvector_1)s)') def test_match_tsvectorconfig(self): - s = select([self.table_alt.c.id])\ - .where( - func.to_tsvector( 'english', self.table_alt.c.title )\ - .match('somestring') - ) - self.assert_compile(s, - 'SELECT mytable.id ' + s = select([self.table_alt.c.id]).where( + func.to_tsvector('english', self.table_alt.c.title) + .match('somestring') + ) + self.assert_compile( + s, 'SELECT mytable.id ' 'FROM mytable ' 'WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ ' - 'to_tsquery(%(to_tsvector_2)s)' - ) + 'to_tsquery(%(to_tsvector_2)s)') def test_match_tsvectorconfig_regconfig(self): - s = select([self.table_alt.c.id])\ - .where(\ - func.to_tsvector( 'english', self.table_alt.c.title )\ - .match('somestring', postgresql_regconfig='english') - ) - self.assert_compile(s, - 'SELECT mytable.id ' + s = select([self.table_alt.c.id]).where( + func.to_tsvector('english', self.table_alt.c.title) + .match('somestring', postgresql_regconfig='english') + ) + self.assert_compile( + s, 'SELECT mytable.id ' 'FROM mytable ' 'WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ ' - """to_tsquery('english', %(to_tsvector_2)s)""" - ) - + """to_tsquery('english', %(to_tsvector_2)s)""") diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index c96e79c13..a0f9e6895 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -19,6 +19,7 @@ from sqlalchemy.testing.mock import Mock class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): __only_on__ = 'postgresql' + __backend__ = True @testing.provide_metadata def test_date_reflection(self): @@ -129,15 +130,19 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): - datetime.timedelta(days=5) for field, exp in ('year', fivedaysago.year), \ ('month', fivedaysago.month), ('day', fivedaysago.day): - r = testing.db.execute(select([extract(field, func.now() - + datetime.timedelta(days=-5))])).scalar() + r = testing.db.execute( + select([ + extract(field, func.now() + datetime.timedelta(days=-5))]) + ).scalar() eq_(r, exp) def test_checksfor_sequence(self): meta1 = MetaData(testing.db) seq = Sequence('fooseq') - t = Table('mytable', meta1, Column('col1', Integer, - seq)) + t = Table( + 'mytable', meta1, + Column('col1', Integer, seq) + ) seq.drop() try: testing.db.execute('CREATE SEQUENCE fooseq') @@ -147,9 +152,10 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): def test_schema_roundtrips(self): meta = MetaData(testing.db) - users = Table('users', meta, Column('id', Integer, - primary_key=True), Column('name', String(50)), - schema='test_schema') + users = Table( + 'users', meta, Column( + 'id', Integer, primary_key=True), Column( + 'name', String(50)), schema='test_schema') users.create() try: users.insert().execute(id=1, name='name1') @@ -158,15 +164,15 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): users.insert().execute(id=4, name='name4') eq_(users.select().where(users.c.name == 'name2') .execute().fetchall(), [(2, 'name2')]) - eq_(users.select(use_labels=True).where(users.c.name - == 'name2').execute().fetchall(), [(2, 'name2')]) + eq_(users.select(use_labels=True).where( + users.c.name == 'name2').execute().fetchall(), [(2, 'name2')]) users.delete().where(users.c.id == 3).execute() eq_(users.select().where(users.c.name == 'name3') .execute().fetchall(), []) users.update().where(users.c.name == 'name4' ).execute(name='newname') - eq_(users.select(use_labels=True).where(users.c.id - == 4).execute().fetchall(), [(4, 'newname')]) + eq_(users.select(use_labels=True).where( + users.c.id == 4).execute().fetchall(), [(4, 'newname')]) finally: users.drop() diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py index 064f0c372..a512b56fa 100644 --- a/test/dialect/postgresql/test_query.py +++ b/test/dialect/postgresql/test_query.py @@ -1,23 +1,23 @@ # coding: utf-8 -from sqlalchemy.testing.assertions import eq_, assert_raises, \ - assert_raises_message, is_, AssertsExecutionResults, \ - AssertsCompiledSQL, ComparesTables +from sqlalchemy.testing import AssertsExecutionResults, eq_, \ + assert_raises_message, AssertsCompiledSQL +from sqlalchemy import Table, Column, MetaData, Integer, String, bindparam, \ + Sequence, ForeignKey, text, select, func, extract, literal_column, \ + tuple_, DateTime, Time, literal, and_, Date, or_ from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing -from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ - String, Sequence, ForeignKey, join, Numeric, \ - PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ - func, literal_column, literal, bindparam, cast, extract, \ - SmallInteger, Enum, REAL, update, insert, Index, delete, \ - and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text from sqlalchemy import exc from sqlalchemy.dialects import postgresql import datetime +metadata = matchtable = cattable = None + + class InsertTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'postgresql' + __backend__ = True @classmethod def setup_class(cls): @@ -32,20 +32,32 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): self.engine.dispose() def test_compiled_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - primary_key=True), Column('data', String(30))) + table = Table( + 'testtable', metadata, Column( + 'id', Integer, primary_key=True), + Column( + 'data', String(30))) metadata.create_all() - ins = table.insert(inline=True, values={'data': bindparam('x' - )}).compile() + ins = table.insert( + inline=True, + values={'data': bindparam('x')}).compile() ins.execute({'x': 'five'}, {'x': 'seven'}) - assert table.select().execute().fetchall() == [(1, 'five'), (2, - 'seven')] + eq_( + table.select().execute().fetchall(), + [(1, 'five'), (2, 'seven')] + ) def test_foreignkey_missing_insert(self): t1 = Table('t1', metadata, Column('id', Integer, - primary_key=True)) - t2 = Table('t2', metadata, Column('id', Integer, - ForeignKey('t1.id'), primary_key=True)) + primary_key=True)) + t2 = Table( + 't2', + metadata, + Column( + 'id', + Integer, + ForeignKey('t1.id'), + primary_key=True)) metadata.create_all() # want to ensure that "null value in column "id" violates not- @@ -55,68 +67,107 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): # the latter corresponds to autoincrement behavior, which is not # the case here due to the foreign key. - for eng in [engines.testing_engine(options={'implicit_returning' - : False}), - engines.testing_engine(options={'implicit_returning' - : True})]: + for eng in [ + engines.testing_engine(options={'implicit_returning': False}), + engines.testing_engine(options={'implicit_returning': True}) + ]: assert_raises_message(exc.DBAPIError, 'violates not-null constraint', eng.execute, t2.insert()) def test_sequence_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - Sequence('my_seq'), primary_key=True), - Column('data', String(30))) + table = Table( + 'testtable', + metadata, + Column( + 'id', + Integer, + Sequence('my_seq'), + primary_key=True), + Column( + 'data', + String(30))) metadata.create_all() self._assert_data_with_sequence(table, 'my_seq') @testing.requires.returning def test_sequence_returning_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - Sequence('my_seq'), primary_key=True), - Column('data', String(30))) + table = Table( + 'testtable', + metadata, + Column( + 'id', + Integer, + Sequence('my_seq'), + primary_key=True), + Column( + 'data', + String(30))) metadata.create_all() self._assert_data_with_sequence_returning(table, 'my_seq') def test_opt_sequence_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - Sequence('my_seq', optional=True), - primary_key=True), Column('data', String(30))) + table = Table( + 'testtable', metadata, + Column( + 'id', Integer, Sequence( + 'my_seq', optional=True), primary_key=True), + Column( + 'data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) @testing.requires.returning def test_opt_sequence_returning_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - Sequence('my_seq', optional=True), - primary_key=True), Column('data', String(30))) + table = Table( + 'testtable', metadata, + Column( + 'id', Integer, Sequence( + 'my_seq', optional=True), primary_key=True), + Column( + 'data', String(30))) metadata.create_all() self._assert_data_autoincrement_returning(table) def test_autoincrement_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - primary_key=True), Column('data', String(30))) + table = Table( + 'testtable', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) @testing.requires.returning def test_autoincrement_returning_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - primary_key=True), Column('data', String(30))) + table = Table( + 'testtable', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'data', String(30))) metadata.create_all() self._assert_data_autoincrement_returning(table) def test_noautoincrement_insert(self): - table = Table('testtable', metadata, Column('id', Integer, - primary_key=True, autoincrement=False), - Column('data', String(30))) + table = Table( + 'testtable', + metadata, + Column( + 'id', + Integer, + primary_key=True, + autoincrement=False), + Column( + 'data', + String(30))) metadata.create_all() self._assert_data_noautoincrement(table) def _assert_data_autoincrement(self, table): self.engine = \ - engines.testing_engine(options={'implicit_returning' - : False}) + engines.testing_engine(options={'implicit_returning': False}) metadata.bind = self.engine def go(): @@ -134,7 +185,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): # executemany with explicit ids table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, - 'data': 'd4'}) + 'data': 'd4'}) # executemany, uses SERIAL @@ -158,13 +209,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): {'id': 1, 'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd8'}]), - ]) + ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), + ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), @@ -174,7 +224,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): (3, 'd6'), (33, 'd7'), (4, 'd8'), - ] + ] table.delete().execute() # test the same series of events using a reflected version of @@ -188,7 +238,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [5] table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, - 'data': 'd4'}) + 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) @@ -200,13 +250,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): {'id': 5, 'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd8'}]), - ]) + ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), + ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (5, 'd2'), @@ -216,7 +265,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): (7, 'd6'), (33, 'd7'), (8, 'd8'), - ] + ] table.delete().execute() def _assert_data_autoincrement_returning(self, table): @@ -239,7 +288,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): # executemany with explicit ids table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, - 'data': 'd4'}) + 'data': 'd4'}) # executemany, uses SERIAL @@ -260,13 +309,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): 'testtable.id', {'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd8'}]), - ]) + ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), + ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), @@ -276,7 +324,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): (3, 'd6'), (33, 'd7'), (4, 'd8'), - ] + ] table.delete().execute() # test the same series of events using a reflected version of @@ -290,7 +338,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [5] table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, - 'data': 'd4'}) + 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) @@ -302,13 +350,12 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): 'testtable.id', {'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (data) VALUES (:data)', + [{'data': 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), - ('INSERT INTO testtable (data) VALUES (:data)', [{'data' - : 'd8'}]), - ]) + ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]), + ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (5, 'd2'), @@ -318,20 +365,19 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): (7, 'd6'), (33, 'd7'), (8, 'd8'), - ] + ] table.delete().execute() def _assert_data_with_sequence(self, table, seqname): self.engine = \ - engines.testing_engine(options={'implicit_returning' - : False}) + engines.testing_engine(options={'implicit_returning': False}) metadata.bind = self.engine def go(): table.insert().execute({'id': 30, 'data': 'd1'}) table.insert().execute({'data': 'd2'}) table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, - 'data': 'd4'}) + 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) @@ -349,7 +395,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): [{'id': 33, 'data': 'd7'}]), ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd8'}]), - ]) + ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), @@ -359,7 +405,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): (3, 'd6'), (33, 'd7'), (4, 'd8'), - ] + ] # cant test reflection here since the Sequence must be # explicitly specified @@ -373,7 +419,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert().execute({'id': 30, 'data': 'd1'}) table.insert().execute({'data': 'd2'}) table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, - 'data': 'd4'}) + 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) @@ -392,7 +438,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): [{'id': 33, 'data': 'd7'}]), ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd8'}]), - ]) + ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), @@ -402,15 +448,14 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): (3, 'd6'), (33, 'd7'), (4, 'd8'), - ] + ] # cant test reflection here since the Sequence must be # explicitly specified def _assert_data_noautoincrement(self, table): self.engine = \ - engines.testing_engine(options={'implicit_returning' - : False}) + engines.testing_engine(options={'implicit_returning': False}) metadata.bind = self.engine table.insert().execute({'id': 30, 'data': 'd1'}) if self.engine.driver == 'pg8000': @@ -434,10 +479,13 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert().execute, {'data': 'd2'}, {'data': 'd3'}) table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, - 'data': 'd3'}) + 'data': 'd3'}) table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) - assert table.select().execute().fetchall() == [(30, 'd1'), (31, - 'd2'), (32, 'd3'), (33, 'd4')] + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (31, 'd2'), + (32, 'd3'), + (33, 'd4')] table.delete().execute() # test the same series of events using a reflected version of @@ -454,10 +502,13 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults): table.insert().execute, {'data': 'd2'}, {'data': 'd3'}) table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, - 'data': 'd3'}) + 'data': 'd3'}) table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) - assert table.select().execute().fetchall() == [(30, 'd1'), (31, - 'd2'), (32, 'd3'), (33, 'd4')] + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (31, 'd2'), + (32, 'd3'), + (33, 'd4')] class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): @@ -466,8 +517,8 @@ class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): def _fixture(self, server_side_cursors): self.engine = engines.testing_engine( - options={'server_side_cursors':server_side_cursors} - ) + options={'server_side_cursors': server_side_cursors} + ) return self.engine def tearDown(self): @@ -507,15 +558,14 @@ class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): assert result.cursor.name - def test_conn_option(self): engine = self._fixture(False) # and this one result = \ engine.connect().execution_options(stream_results=True).\ - execute('select 1' - ) + execute('select 1' + ) assert result.cursor.name def test_stmt_enabled_conn_option_disabled(self): @@ -526,7 +576,7 @@ class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): # not this one result = \ engine.connect().execution_options(stream_results=False).\ - execute(s) + execute(s) assert not result.cursor.name def test_stmt_option_disabled(self): @@ -581,12 +631,13 @@ class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): nextid = engine.execute(Sequence('test_table_id_seq')) test_table.insert().execute(id=nextid, data='data2') eq_(test_table.select().execute().fetchall(), [(1, 'data1' - ), (2, 'data2')]) - test_table.update().where(test_table.c.id - == 2).values(data=test_table.c.data + ' updated' - ).execute() - eq_(test_table.select().execute().fetchall(), [(1, 'data1' - ), (2, 'data2 updated')]) + ), (2, 'data2')]) + test_table.update().where( + test_table.c.id == 2).values( + data=test_table.c.data + + ' updated').execute() + eq_(test_table.select().execute().fetchall(), + [(1, 'data1'), (2, 'data2 updated')]) test_table.delete().execute() eq_(test_table.count().scalar(), 0) finally: @@ -596,36 +647,37 @@ class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): class MatchTest(fixtures.TestBase, AssertsCompiledSQL): __only_on__ = 'postgresql >= 8.3' + __backend__ = True @classmethod def setup_class(cls): global metadata, cattable, matchtable metadata = MetaData(testing.db) - cattable = Table('cattable', metadata, Column('id', Integer, - primary_key=True), Column('description', - String(50))) - matchtable = Table('matchtable', metadata, Column('id', - Integer, primary_key=True), Column('title', - String(200)), Column('category_id', Integer, - ForeignKey('cattable.id'))) + cattable = Table( + 'cattable', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'description', String(50))) + matchtable = Table( + 'matchtable', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'title', String(200)), + Column( + 'category_id', Integer, ForeignKey('cattable.id'))) metadata.create_all() cattable.insert().execute([{'id': 1, 'description': 'Python'}, - {'id': 2, 'description': 'Ruby'}]) - matchtable.insert().execute([{'id': 1, 'title' - : 'Agile Web Development with Rails' - , 'category_id': 2}, - {'id': 2, - 'title': 'Dive Into Python', - 'category_id': 1}, - {'id': 3, 'title' - : "Programming Matz's Ruby", - 'category_id': 2}, - {'id': 4, 'title' - : 'The Definitive Guide to Django', - 'category_id': 1}, - {'id': 5, 'title' - : 'Python in a Nutshell', - 'category_id': 1}]) + {'id': 2, 'description': 'Ruby'}]) + matchtable.insert().execute( + [{'id': 1, 'title': 'Agile Web Development with Rails', + 'category_id': 2}, + {'id': 2, 'title': 'Dive Into Python', 'category_id': 1}, + {'id': 3, 'title': "Programming Matz's Ruby", 'category_id': 2}, + {'id': 4, 'title': 'The Definitive Guide to Django', + 'category_id': 1}, + {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}]) @classmethod def teardown_class(cls): @@ -646,58 +698,58 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): 'matchtable.title @@ to_tsquery(%s)') def test_simple_match(self): - results = \ - matchtable.select().where(matchtable.c.title.match('python' - )).order_by(matchtable.c.id).execute().fetchall() + results = matchtable.select().where( + matchtable.c.title.match('python')).order_by( + matchtable.c.id).execute().fetchall() eq_([2, 5], [r.id for r in results]) def test_simple_match_with_apostrophe(self): - results = \ - matchtable.select().where(matchtable.c.title.match("Matz's" - )).execute().fetchall() + results = matchtable.select().where( + matchtable.c.title.match("Matz's")).execute().fetchall() eq_([3], [r.id for r in results]) def test_simple_derivative_match(self): - results = \ - matchtable.select().where(matchtable.c.title.match('nutshells' - )).execute().fetchall() + results = matchtable.select().where( + matchtable.c.title.match('nutshells')).execute().fetchall() eq_([5], [r.id for r in results]) def test_or_match(self): - results1 = \ - matchtable.select().where(or_(matchtable.c.title.match('nutshells' - ), matchtable.c.title.match('rubies' - ))).order_by(matchtable.c.id).execute().fetchall() + results1 = matchtable.select().where( + or_( + matchtable.c.title.match('nutshells'), + matchtable.c.title.match('rubies'))).order_by( + matchtable.c.id).execute().fetchall() eq_([3, 5], [r.id for r in results1]) - results2 = \ - matchtable.select().where( - matchtable.c.title.match('nutshells | rubies' - )).order_by(matchtable.c.id).execute().fetchall() + results2 = matchtable.select().where( + matchtable.c.title.match('nutshells | rubies')).order_by( + matchtable.c.id).execute().fetchall() eq_([3, 5], [r.id for r in results2]) def test_and_match(self): - results1 = \ - matchtable.select().where(and_(matchtable.c.title.match('python' - ), matchtable.c.title.match('nutshells' - ))).execute().fetchall() + results1 = matchtable.select().where( + and_( + matchtable.c.title.match('python'), + matchtable.c.title.match('nutshells'))).execute().fetchall() eq_([5], [r.id for r in results1]) results2 = \ matchtable.select().where( matchtable.c.title.match('python & nutshells' - )).execute().fetchall() + )).execute().fetchall() eq_([5], [r.id for r in results2]) def test_match_across_joins(self): - results = matchtable.select().where(and_(cattable.c.id - == matchtable.c.category_id, - or_(cattable.c.description.match('Ruby'), - matchtable.c.title.match('nutshells' - )))).order_by(matchtable.c.id).execute().fetchall() + results = matchtable.select().where( + and_( + cattable.c.id == matchtable.c.category_id, or_( + cattable.c.description.match('Ruby'), + matchtable.c.title.match('nutshells')))).order_by( + matchtable.c.id).execute().fetchall() eq_([1, 3, 5], [r.id for r in results]) class TupleTest(fixtures.TestBase): __only_on__ = 'postgresql' + __backend__ = True def test_tuple_containment(self): @@ -710,24 +762,24 @@ class TupleTest(fixtures.TestBase): eq_( testing.db.execute( select([ - tuple_( - literal_column("'a'"), - literal_column("'b'") - ).\ - in_([ - tuple_(*[ - literal_column("'%s'" % letter) - for letter in elem - ]) for elem in test - ]) - ]) + tuple_( + literal_column("'a'"), + literal_column("'b'") + ). + in_([ + tuple_(*[ + literal_column("'%s'" % letter) + for letter in elem + ]) for elem in test + ]) + ]) ).scalar(), exp ) - class ExtractTest(fixtures.TablesTest): + """The rationale behind this test is that for many years we've had a system of embedding type casts into the expressions rendered by visit_extract() on the postgreql platform. The reason for this cast is not clear. @@ -736,6 +788,7 @@ class ExtractTest(fixtures.TablesTest): """ __only_on__ = 'postgresql' + __backend__ = True run_inserts = 'once' run_deletes = None @@ -743,19 +796,20 @@ class ExtractTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): Table('t', metadata, - Column('id', Integer, primary_key=True), - Column('dtme', DateTime), - Column('dt', Date), - Column('tm', Time), - Column('intv', postgresql.INTERVAL), - Column('dttz', DateTime(timezone=True)) - ) + Column('id', Integer, primary_key=True), + Column('dtme', DateTime), + Column('dt', Date), + Column('tm', Time), + Column('intv', postgresql.INTERVAL), + Column('dttz', DateTime(timezone=True)) + ) @classmethod def insert_data(cls): # TODO: why does setting hours to anything # not affect the TZ in the DB col ? class TZ(datetime.tzinfo): + def utcoffset(self, dt): return datetime.timedelta(hours=4) @@ -780,18 +834,18 @@ class ExtractTest(fixtures.TablesTest): if field == "all": fields = {"year": 2012, "month": 5, "day": 10, - "epoch": 1336652125.0, - "hour": 12, "minute": 15} + "epoch": 1336652125.0, + "hour": 12, "minute": 15} elif field == "time": fields = {"hour": 12, "minute": 15, "second": 25} elif field == 'date': fields = {"year": 2012, "month": 5, "day": 10} elif field == 'all+tz': fields = {"year": 2012, "month": 5, "day": 10, - "epoch": 1336637725.0, - "hour": 8, - "timezone": 0 - } + "epoch": 1336637725.0, + "hour": 8, + "timezone": 0 + } else: fields = field @@ -800,7 +854,7 @@ class ExtractTest(fixtures.TablesTest): for field in fields: result = testing.db.scalar( - select([extract(field, expr)]).select_from(t)) + select([extract(field, expr)]).select_from(t)) eq_(result, fields[field]) def test_one(self): @@ -810,45 +864,45 @@ class ExtractTest(fixtures.TablesTest): def test_two(self): t = self.tables.t self._test(t.c.dtme + t.c.intv, - overrides={"epoch": 1336652695.0, "minute": 24}) + overrides={"epoch": 1336652695.0, "minute": 24}) def test_three(self): t = self.tables.t actual_ts = testing.db.scalar(func.current_timestamp()) - \ - datetime.timedelta(days=5) + datetime.timedelta(days=5) self._test(func.current_timestamp() - datetime.timedelta(days=5), - {"hour": actual_ts.hour, "year": actual_ts.year, - "month": actual_ts.month} - ) + {"hour": actual_ts.hour, "year": actual_ts.year, + "month": actual_ts.month} + ) def test_four(self): t = self.tables.t self._test(datetime.timedelta(days=5) + t.c.dt, - overrides={"day": 15, "epoch": 1337040000.0, "hour": 0, - "minute": 0} - ) + overrides={"day": 15, "epoch": 1337040000.0, "hour": 0, + "minute": 0} + ) def test_five(self): t = self.tables.t self._test(func.coalesce(t.c.dtme, func.current_timestamp()), - overrides={"epoch": 1336652125.0}) + overrides={"epoch": 1336652125.0}) def test_six(self): t = self.tables.t self._test(t.c.tm + datetime.timedelta(seconds=30), "time", - overrides={"second": 55}) + overrides={"second": 55}) def test_seven(self): self._test(literal(datetime.timedelta(seconds=10)) - - literal(datetime.timedelta(seconds=10)), "all", - overrides={"hour": 0, "minute": 0, "month": 0, - "year": 0, "day": 0, "epoch": 0}) + - literal(datetime.timedelta(seconds=10)), "all", + overrides={"hour": 0, "minute": 0, "month": 0, + "year": 0, "day": 0, "epoch": 0}) def test_eight(self): t = self.tables.t self._test(t.c.tm + datetime.timedelta(seconds=30), - {"hour": 12, "minute": 15, "second": 55}) + {"hour": 12, "minute": 15, "second": 55}) def test_nine(self): self._test(text("t.dt + t.tm")) @@ -859,19 +913,21 @@ class ExtractTest(fixtures.TablesTest): def test_eleven(self): self._test(func.current_timestamp() - func.current_timestamp(), - {"year": 0, "month": 0, "day": 0, "hour": 0} - ) + {"year": 0, "month": 0, "day": 0, "hour": 0} + ) def test_twelve(self): t = self.tables.t actual_ts = testing.db.scalar( - func.current_timestamp()).replace(tzinfo=None) - \ - datetime.datetime(2012, 5, 10, 12, 15, 25) + func.current_timestamp()).replace(tzinfo=None) - \ + datetime.datetime(2012, 5, 10, 12, 15, 25) - self._test(func.current_timestamp() - func.coalesce(t.c.dtme, - func.current_timestamp()), - {"day": actual_ts.days} - ) + self._test( + func.current_timestamp() - func.coalesce( + t.c.dtme, + func.current_timestamp() + ), + {"day": actual_ts.days}) def test_thirteen(self): t = self.tables.t @@ -884,5 +940,5 @@ class ExtractTest(fixtures.TablesTest): def test_fifteen(self): t = self.tables.t self._test(datetime.timedelta(days=5) + t.c.dtme, - overrides={"day": 15, "epoch": 1337084125.0} - ) + overrides={"day": 15, "epoch": 1337084125.0} + ) diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index 01e14ffa0..1d6a41765 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -1,40 +1,36 @@ # coding: utf-8 from sqlalchemy.testing.assertions import eq_, assert_raises, \ - assert_raises_message, is_, AssertsExecutionResults, \ - AssertsCompiledSQL, ComparesTables -from sqlalchemy.testing import engines, fixtures + AssertsExecutionResults +from sqlalchemy.testing import fixtures from sqlalchemy import testing from sqlalchemy import inspect -from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ - String, Sequence, ForeignKey, join, Numeric, \ - PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ - func, literal_column, literal, bindparam, cast, extract, \ - SmallInteger, Enum, REAL, update, insert, Index, delete, \ - and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy import Table, Column, MetaData, Integer, String, \ + PrimaryKeyConstraint, ForeignKey, join, Sequence from sqlalchemy import exc +import sqlalchemy as sa from sqlalchemy.dialects.postgresql import base as postgresql -import logging + class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): """Test PostgreSQL domains""" __only_on__ = 'postgresql > 8.3' + __backend__ = True @classmethod def setup_class(cls): con = testing.db.connect() for ddl in \ - 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \ - 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \ - "CREATE TYPE testtype AS ENUM ('test')", \ - 'CREATE DOMAIN enumdomain AS testtype'\ - : + 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \ + 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \ + "CREATE TYPE testtype AS ENUM ('test')", \ + 'CREATE DOMAIN enumdomain AS testtype': try: con.execute(ddl) except exc.DBAPIError as e: - if not 'already exists' in str(e): + if 'already exists' not in str(e): raise e con.execute('CREATE TABLE testtable (question integer, answer ' 'testdomain)') @@ -86,7 +82,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): table = Table('testtable', metadata, autoload=True, schema='test_schema') eq_(set(table.columns.keys()), set(['question', 'answer', - 'anything']), + 'anything']), "Columns of reflected table didn't equal expected columns") assert isinstance(table.c.anything.type, Integer) @@ -127,9 +123,10 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): class ReflectionTest(fixtures.TestBase): __only_on__ = 'postgresql' + __backend__ = True @testing.fails_if("postgresql < 8.4", - "Better int2vector functions not available") + "Better int2vector functions not available") @testing.provide_metadata def test_reflected_primary_key_order(self): meta1 = self.metadata @@ -147,28 +144,32 @@ class ReflectionTest(fixtures.TestBase): def test_pg_weirdchar_reflection(self): meta1 = self.metadata subject = Table('subject', meta1, Column('id$', Integer, - primary_key=True)) - referer = Table('referer', meta1, Column('id', Integer, - primary_key=True), Column('ref', Integer, - ForeignKey('subject.id$'))) + primary_key=True)) + referer = Table( + 'referer', meta1, + Column( + 'id', Integer, primary_key=True), + Column( + 'ref', Integer, ForeignKey('subject.id$'))) meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True) referer = Table('referer', meta2, autoload=True) self.assert_((subject.c['id$'] - == referer.c.ref).compare( - subject.join(referer).onclause)) + == referer.c.ref).compare( + subject.join(referer).onclause)) @testing.provide_metadata def test_reflect_default_over_128_chars(self): Table('t', self.metadata, - Column('x', String(200), server_default="abcd" * 40) - ).create(testing.db) + Column('x', String(200), server_default="abcd" * 40) + ).create(testing.db) m = MetaData() t = Table('t', m, autoload=True, autoload_with=testing.db) eq_( - t.c.x.server_default.arg.text, "'%s'::character varying" % ("abcd" * 40) + t.c.x.server_default.arg.text, "'%s'::character varying" % ( + "abcd" * 40) ) @testing.fails_if("postgresql < 8.1", "schema name leaks in, not sure") @@ -184,8 +185,8 @@ class ReflectionTest(fixtures.TestBase): r = t2.insert().execute() eq_(r.inserted_primary_key, [1]) testing.db.connect().execution_options(autocommit=True).\ - execute('alter table t_id_seq rename to foobar_id_seq' - ) + execute('alter table t_id_seq rename to foobar_id_seq' + ) m3 = MetaData(testing.db) t3 = Table('t', m3, autoload=True, implicit_returning=False) eq_(t3.c.id.server_default.arg.text, @@ -210,14 +211,18 @@ class ReflectionTest(fixtures.TestBase): meta1 = self.metadata users = Table('users', meta1, - Column('user_id', Integer, primary_key=True), - Column('user_name', String(30), nullable=False), - schema='test_schema') - addresses = Table('email_addresses', meta1, - Column('address_id', Integer, primary_key=True), - Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), - Column('email_address', String(20)), - schema='test_schema') + Column('user_id', Integer, primary_key=True), + Column('user_name', String(30), nullable=False), + schema='test_schema') + addresses = Table( + 'email_addresses', meta1, + Column( + 'address_id', Integer, primary_key=True), + Column( + 'remote_user_id', Integer, ForeignKey( + users.c.user_id)), + Column( + 'email_address', String(20)), schema='test_schema') meta1.create_all() meta2 = MetaData(testing.db) addresses = Table('email_addresses', meta2, autoload=True, @@ -226,25 +231,25 @@ class ReflectionTest(fixtures.TestBase): schema='test_schema') j = join(users, addresses) self.assert_((users.c.user_id - == addresses.c.remote_user_id).compare(j.onclause)) + == addresses.c.remote_user_id).compare(j.onclause)) @testing.provide_metadata def test_cross_schema_reflection_two(self): meta1 = self.metadata subject = Table('subject', meta1, - Column('id', Integer, primary_key=True)) + Column('id', Integer, primary_key=True)) referer = Table('referer', meta1, - Column('id', Integer, primary_key=True), - Column('ref', Integer, ForeignKey('subject.id')), - schema='test_schema') + Column('id', Integer, primary_key=True), + Column('ref', Integer, ForeignKey('subject.id')), + schema='test_schema') meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True) referer = Table('referer', meta2, schema='test_schema', autoload=True) self.assert_((subject.c.id - == referer.c.ref).compare( - subject.join(referer).onclause)) + == referer.c.ref).compare( + subject.join(referer).onclause)) @testing.provide_metadata def test_cross_schema_reflection_three(self): @@ -252,10 +257,18 @@ class ReflectionTest(fixtures.TestBase): subject = Table('subject', meta1, Column('id', Integer, primary_key=True), schema='test_schema_2') - referer = Table('referer', meta1, - Column('id', Integer, primary_key=True), - Column('ref', Integer, ForeignKey('test_schema_2.subject.id')), - schema='test_schema') + referer = Table( + 'referer', + meta1, + Column( + 'id', + Integer, + primary_key=True), + Column( + 'ref', + Integer, + ForeignKey('test_schema_2.subject.id')), + schema='test_schema') meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True, @@ -263,8 +276,8 @@ class ReflectionTest(fixtures.TestBase): referer = Table('referer', meta2, autoload=True, schema='test_schema') self.assert_((subject.c.id - == referer.c.ref).compare( - subject.join(referer).onclause)) + == referer.c.ref).compare( + subject.join(referer).onclause)) @testing.provide_metadata def test_cross_schema_reflection_four(self): @@ -272,10 +285,18 @@ class ReflectionTest(fixtures.TestBase): subject = Table('subject', meta1, Column('id', Integer, primary_key=True), schema='test_schema_2') - referer = Table('referer', meta1, - Column('id', Integer, primary_key=True), - Column('ref', Integer, ForeignKey('test_schema_2.subject.id')), - schema='test_schema') + referer = Table( + 'referer', + meta1, + Column( + 'id', + Integer, + primary_key=True), + Column( + 'ref', + Integer, + ForeignKey('test_schema_2.subject.id')), + schema='test_schema') meta1.create_all() conn = testing.db.connect() @@ -289,8 +310,8 @@ class ReflectionTest(fixtures.TestBase): schema='test_schema', postgresql_ignore_search_path=True) self.assert_((subject.c.id - == referer.c.ref).compare( - subject.join(referer).onclause)) + == referer.c.ref).compare( + subject.join(referer).onclause)) conn.close() @testing.provide_metadata @@ -317,8 +338,8 @@ class ReflectionTest(fixtures.TestBase): ) assert subject.schema == default_schema self.assert_((subject.c.id - == referer.c.ref).compare( - subject.join(referer).onclause)) + == referer.c.ref).compare( + subject.join(referer).onclause)) @testing.provide_metadata def test_cross_schema_reflection_six(self): @@ -327,60 +348,60 @@ class ReflectionTest(fixtures.TestBase): meta1 = self.metadata Table('some_table', meta1, - Column('id', Integer, primary_key=True), - schema='test_schema' - ) + Column('id', Integer, primary_key=True), + schema='test_schema' + ) Table('some_other_table', meta1, - Column('id', Integer, primary_key=True), - Column('sid', Integer, ForeignKey('test_schema.some_table.id')), - schema='test_schema_2' - ) + Column('id', Integer, primary_key=True), + Column('sid', Integer, ForeignKey('test_schema.some_table.id')), + schema='test_schema_2' + ) meta1.create_all() with testing.db.connect() as conn: conn.detach() - conn.execute("set search_path to test_schema_2, test_schema, public") + conn.execute( + "set search_path to test_schema_2, test_schema, public") m1 = MetaData(conn) t1_schema = Table('some_table', - m1, - schema="test_schema", - autoload=True) + m1, + schema="test_schema", + autoload=True) t2_schema = Table('some_other_table', - m1, - schema="test_schema_2", - autoload=True) + m1, + schema="test_schema_2", + autoload=True) t2_no_schema = Table('some_other_table', - m1, - autoload=True) + m1, + autoload=True) t1_no_schema = Table('some_table', - m1, - autoload=True) + m1, + autoload=True) m2 = MetaData(conn) t1_schema_isp = Table('some_table', - m2, - schema="test_schema", - autoload=True, - postgresql_ignore_search_path=True) + m2, + schema="test_schema", + autoload=True, + postgresql_ignore_search_path=True) t2_schema_isp = Table('some_other_table', - m2, - schema="test_schema_2", - autoload=True, - postgresql_ignore_search_path=True) - + m2, + schema="test_schema_2", + autoload=True, + postgresql_ignore_search_path=True) # t2_schema refers to t1_schema, but since "test_schema" # is in the search path, we instead link to t2_no_schema assert t2_schema.c.sid.references( - t1_no_schema.c.id) + t1_no_schema.c.id) # the two no_schema tables refer to each other also. assert t2_no_schema.c.sid.references( - t1_no_schema.c.id) + t1_no_schema.c.id) # but if we're ignoring search path, then we maintain # those explicit schemas vs. what the "default" schema is @@ -393,30 +414,32 @@ class ReflectionTest(fixtures.TestBase): meta1 = self.metadata Table('some_table', meta1, - Column('id', Integer, primary_key=True), - schema='test_schema' - ) + Column('id', Integer, primary_key=True), + schema='test_schema' + ) Table('some_other_table', meta1, - Column('id', Integer, primary_key=True), - Column('sid', Integer, ForeignKey('test_schema.some_table.id')), - schema='test_schema_2' - ) + Column('id', Integer, primary_key=True), + Column('sid', Integer, ForeignKey('test_schema.some_table.id')), + schema='test_schema_2' + ) meta1.create_all() with testing.db.connect() as conn: conn.detach() - conn.execute("set search_path to test_schema_2, test_schema, public") + conn.execute( + "set search_path to test_schema_2, test_schema, public") meta2 = MetaData(conn) meta2.reflect(schema="test_schema_2") - eq_(set(meta2.tables), set(['test_schema_2.some_other_table', 'some_table'])) + eq_(set(meta2.tables), set( + ['test_schema_2.some_other_table', 'some_table'])) meta3 = MetaData(conn) - meta3.reflect(schema="test_schema_2", postgresql_ignore_search_path=True) - - eq_(set(meta3.tables), - set(['test_schema_2.some_other_table', 'test_schema.some_table'])) + meta3.reflect( + schema="test_schema_2", postgresql_ignore_search_path=True) + eq_(set(meta3.tables), set( + ['test_schema_2.some_other_table', 'test_schema.some_table'])) @testing.provide_metadata def test_uppercase_lowercase_table(self): @@ -445,7 +468,6 @@ class ReflectionTest(fixtures.TestBase): a_seq.drop(testing.db) A_seq.drop(testing.db) - @testing.provide_metadata def test_index_reflection(self): """ Reflecting partial & expression-based indexes should warn @@ -453,9 +475,14 @@ class ReflectionTest(fixtures.TestBase): metadata = self.metadata - t1 = Table('party', metadata, Column('id', String(10), - nullable=False), Column('name', String(20), - index=True), Column('aname', String(20))) + t1 = Table( + 'party', metadata, + Column( + 'id', String(10), nullable=False), + Column( + 'name', String(20), index=True), + Column( + 'aname', String(20))) metadata.create_all() testing.db.execute(""" create index idx1 on party ((id || name)) @@ -484,15 +511,14 @@ class ReflectionTest(fixtures.TestBase): assert [t2.c.id] == r1.columns assert [t2.c.name] == r2.columns - testing.assert_warnings(go, - [ - 'Skipped unsupported reflection of ' - 'expression-based index idx1', - 'Predicate of partial index idx2 ignored during ' - 'reflection', - 'Skipped unsupported reflection of ' - 'expression-based index idx3' - ]) + testing.assert_warnings( + go, + ['Skipped unsupported reflection of ' + 'expression-based index idx1', + 'Predicate of partial index idx2 ignored during ' + 'reflection', + 'Skipped unsupported reflection of ' + 'expression-based index idx3']) @testing.provide_metadata def test_index_reflection_modified(self): @@ -505,9 +531,9 @@ class ReflectionTest(fixtures.TestBase): metadata = self.metadata t1 = Table('t', metadata, - Column('id', Integer, primary_key=True), - Column('x', Integer) - ) + Column('id', Integer, primary_key=True), + Column('x', Integer) + ) metadata.create_all() conn = testing.db.connect().execution_options(autocommit=True) conn.execute("CREATE INDEX idx1 ON t (x)") @@ -520,30 +546,45 @@ class ReflectionTest(fixtures.TestBase): @testing.provide_metadata def test_foreign_key_option_inspection(self): metadata = self.metadata - Table('person', metadata, - Column('id', String(length=32), nullable=False, primary_key=True), - Column('company_id', ForeignKey('company.id', - name='person_company_id_fkey', - match='FULL', onupdate='RESTRICT', ondelete='RESTRICT', - deferrable=True, initially='DEFERRED' - ) - ) - ) - Table('company', metadata, + Table( + 'person', + metadata, + Column( + 'id', + String( + length=32), + nullable=False, + primary_key=True), + Column( + 'company_id', + ForeignKey( + 'company.id', + name='person_company_id_fkey', + match='FULL', + onupdate='RESTRICT', + ondelete='RESTRICT', + deferrable=True, + initially='DEFERRED'))) + Table( + 'company', metadata, Column('id', String(length=32), nullable=False, primary_key=True), Column('name', String(length=255)), - Column('industry_id', ForeignKey('industry.id', - name='company_industry_id_fkey', - onupdate='CASCADE', ondelete='CASCADE', - deferrable=False, # PG default - initially='IMMEDIATE' # PG default + Column( + 'industry_id', + ForeignKey( + 'industry.id', + name='company_industry_id_fkey', + onupdate='CASCADE', ondelete='CASCADE', + deferrable=False, # PG default + # PG default + initially='IMMEDIATE' ) ) ) Table('industry', metadata, - Column('id', Integer(), nullable=False, primary_key=True), - Column('name', String(length=255)) - ) + Column('id', Integer(), nullable=False, primary_key=True), + Column('name', String(length=255)) + ) fk_ref = { 'person_company_id_fkey': { 'name': 'person_company_id_fkey', @@ -557,8 +598,8 @@ class ReflectionTest(fixtures.TestBase): 'ondelete': 'RESTRICT', 'initially': 'DEFERRED', 'match': 'FULL' - } - }, + } + }, 'company_industry_id_fkey': { 'name': 'company_industry_id_fkey', 'constrained_columns': ['industry_id'], @@ -581,9 +622,11 @@ class ReflectionTest(fixtures.TestBase): for fk in fks: eq_(fk, fk_ref[fk['name']]) + class CustomTypeReflectionTest(fixtures.TestBase): class CustomType(object): + def __init__(self, arg1=None, arg2=None): self.arg1 = arg1 self.arg2 = arg2 diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py index 87250d467..33219ce4c 100644 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@ -1,62 +1,60 @@ # coding: utf-8 from sqlalchemy.testing.assertions import eq_, assert_raises, \ - assert_raises_message, is_, AssertsExecutionResults, \ - AssertsCompiledSQL, ComparesTables + assert_raises_message, is_, AssertsExecutionResults, \ + AssertsCompiledSQL, ComparesTables from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing import datetime -from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ - String, Sequence, ForeignKey, join, Numeric, \ - PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ - func, literal_column, literal, bindparam, cast, extract, \ - SmallInteger, Enum, REAL, update, insert, Index, delete, \ - and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text, \ - type_coerce -from sqlalchemy.orm import Session, mapper, aliased -from sqlalchemy import exc, schema, types +from sqlalchemy import Table, MetaData, Column, Integer, Enum, Float, select, \ + func, DateTime, Numeric, exc, String, cast, REAL, TypeDecorator, Unicode, \ + Text +from sqlalchemy.sql import operators +from sqlalchemy import types from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \ - INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \ - JSON, JSONB + INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \ + JSON, JSONB import decimal from sqlalchemy import util from sqlalchemy.testing.util import round_decimal -from sqlalchemy.sql import table, column, operators -import logging -import re from sqlalchemy import inspect from sqlalchemy import event +tztable = notztable = metadata = table = None + + class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): __only_on__ = 'postgresql' __dialect__ = postgresql.dialect() + __backend__ = True @classmethod def define_tables(cls, metadata): data_table = Table('data_table', metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer) - ) + Column('id', Integer, primary_key=True), + Column('data', Integer) + ) @classmethod def insert_data(cls): data_table = cls.tables.data_table data_table.insert().execute( - {'data':3}, - {'data':5}, - {'data':7}, - {'data':2}, - {'data':15}, - {'data':12}, - {'data':6}, - {'data':478}, - {'data':52}, - {'data':9}, - ) - - @testing.fails_on('postgresql+zxjdbc', - 'XXX: postgresql+zxjdbc currently returns a Decimal result for Float') + {'data': 3}, + {'data': 5}, + {'data': 7}, + {'data': 2}, + {'data': 15}, + {'data': 12}, + {'data': 6}, + {'data': 478}, + {'data': 52}, + {'data': 9}, + ) + + @testing.fails_on( + 'postgresql+zxjdbc', + 'XXX: postgresql+zxjdbc currently returns a Decimal result for Float') def test_float_coercion(self): data_table = self.tables.data_table @@ -87,11 +85,11 @@ class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): def test_arrays(self): metadata = self.metadata t1 = Table('t', metadata, - Column('x', postgresql.ARRAY(Float)), - Column('y', postgresql.ARRAY(REAL)), - Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)), - Column('q', postgresql.ARRAY(Numeric)) - ) + Column('x', postgresql.ARRAY(Float)), + Column('y', postgresql.ARRAY(REAL)), + Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)), + Column('q', postgresql.ARRAY(Numeric)) + ) metadata.create_all() t1.insert().execute(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]) row = t1.select().execute().first() @@ -100,19 +98,24 @@ class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): ([5], [5], [6], [decimal.Decimal("6.4")]) ) + class EnumTest(fixtures.TestBase, AssertsExecutionResults): + __backend__ = True __only_on__ = 'postgresql > 8.3' - @testing.fails_on('postgresql+zxjdbc', 'zxjdbc fails on ENUM: column "XXX" is of type ' 'XXX but expression is of type character varying') def test_create_table(self): metadata = MetaData(testing.db) - t1 = Table('table', metadata, Column('id', Integer, - primary_key=True), Column('value', Enum('one', 'two' - , 'three', name='onetwothreetype'))) + t1 = Table( + 'table', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'value', Enum( + 'one', 'two', 'three', name='onetwothreetype'))) t1.create() t1.create(checkfirst=True) # check the create try: @@ -138,20 +141,28 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): @testing.provide_metadata def test_unicode_labels(self): metadata = self.metadata - t1 = Table('table', metadata, - Column('id', Integer, primary_key=True), - Column('value', - Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'), - name='onetwothreetype')) - ) + t1 = Table( + 'table', + metadata, + Column( + 'id', + Integer, + primary_key=True), + Column( + 'value', + Enum( + util.u('réveillé'), + util.u('drôle'), + util.u('S’il'), + name='onetwothreetype'))) metadata.create_all() t1.insert().execute(value=util.u('drôle')) t1.insert().execute(value=util.u('réveillé')) t1.insert().execute(value=util.u('S’il')) eq_(t1.select().order_by(t1.c.id).execute().fetchall(), [(1, util.u('drôle')), (2, util.u('réveillé')), - (3, util.u('S’il'))] - ) + (3, util.u('S’il'))] + ) m2 = MetaData(testing.db) t2 = Table('table', m2, autoload=True) eq_( @@ -161,17 +172,27 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): def test_non_native_type(self): metadata = MetaData() - t1 = Table('foo', metadata, Column('bar', Enum('one', 'two', - 'three', name='myenum', native_enum=False))) + t1 = Table( + 'foo', + metadata, + Column( + 'bar', + Enum( + 'one', + 'two', + 'three', + name='myenum', + native_enum=False))) def go(): t1.create(testing.db) try: - self.assert_sql(testing.db, go, [], - with_sequences=[("CREATE TABLE foo (\tbar " - "VARCHAR(5), \tCONSTRAINT myenum CHECK " - "(bar IN ('one', 'two', 'three')))", {})]) + self.assert_sql( + testing.db, go, [], with_sequences=[ + ("CREATE TABLE foo (\tbar " + "VARCHAR(5), \tCONSTRAINT myenum CHECK " + "(bar IN ('one', 'two', 'three')))", {})]) finally: metadata.drop_all(testing.db) @@ -180,12 +201,12 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): metadata = self.metadata e1 = postgresql.ENUM('one', 'two', 'three', - name="myenum", - create_type=False) + name="myenum", + create_type=False) t1 = Table('e1', metadata, - Column('c1', e1) - ) + Column('c1', e1) + ) # table can be created separately # without conflict e1.create(bind=testing.db) @@ -205,14 +226,14 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): metadata = self.metadata e1 = Enum('one', 'two', 'three', - name="myenum") + name="myenum") t1 = Table('e1', metadata, - Column('c1', e1) - ) + Column('c1', e1) + ) t2 = Table('e2', metadata, - Column('c1', e1) - ) + Column('c1', e1) + ) metadata.create_all(checkfirst=False) metadata.drop_all(checkfirst=False) @@ -222,17 +243,26 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): engine.connect() engine.dialect.supports_native_enum = False metadata = MetaData() - t1 = Table('foo', metadata, Column('bar', Enum('one', 'two', - 'three', name='myenum'))) + t1 = Table( + 'foo', + metadata, + Column( + 'bar', + Enum( + 'one', + 'two', + 'three', + name='myenum'))) def go(): t1.create(engine) try: - self.assert_sql(engine, go, [], - with_sequences=[("CREATE TABLE foo (\tbar " - "VARCHAR(5), \tCONSTRAINT myenum CHECK " - "(bar IN ('one', 'two', 'three')))", {})]) + self.assert_sql( + engine, go, [], with_sequences=[ + ("CREATE TABLE foo (\tbar " + "VARCHAR(5), \tCONSTRAINT myenum CHECK " + "(bar IN ('one', 'two', 'three')))", {})]) finally: metadata.drop_all(engine) @@ -243,19 +273,19 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): etype.create() try: assert testing.db.dialect.has_type(testing.db, - 'fourfivesixtype') + 'fourfivesixtype') finally: etype.drop() assert not testing.db.dialect.has_type(testing.db, - 'fourfivesixtype') + 'fourfivesixtype') metadata.create_all() try: assert testing.db.dialect.has_type(testing.db, - 'fourfivesixtype') + 'fourfivesixtype') finally: metadata.drop_all() assert not testing.db.dialect.has_type(testing.db, - 'fourfivesixtype') + 'fourfivesixtype') def test_no_support(self): def server_version_info(self): @@ -274,15 +304,18 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): e.connect() assert not dialect.supports_native_enum - def test_reflection(self): metadata = MetaData(testing.db) etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata) - t1 = Table('table', metadata, Column('id', Integer, - primary_key=True), Column('value', Enum('one', 'two' - , 'three', name='onetwothreetype')), Column('value2' - , etype)) + t1 = Table( + 'table', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'value', Enum( + 'one', 'two', 'three', name='onetwothreetype')), + Column('value2', etype)) metadata.create_all() try: m2 = MetaData(testing.db) @@ -303,11 +336,16 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): name='fourfivesixtype', schema='test_schema', metadata=metadata, - ) - t1 = Table('table', metadata, Column('id', Integer, - primary_key=True), Column('value', Enum('one', 'two' - , 'three', name='onetwothreetype', - schema='test_schema')), Column('value2', etype)) + ) + t1 = Table( + 'table', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'value', Enum( + 'one', 'two', 'three', + name='onetwothreetype', schema='test_schema')), + Column('value2', etype)) metadata.create_all() try: m2 = MetaData(testing.db) @@ -320,21 +358,25 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults): finally: metadata.drop_all() + class OIDTest(fixtures.TestBase): __only_on__ = 'postgresql' + __backend__ = True @testing.provide_metadata def test_reflection(self): metadata = self.metadata Table('table', metadata, Column('x', Integer), - Column('y', postgresql.OID)) + Column('y', postgresql.OID)) metadata.create_all() m2 = MetaData() t2 = Table('table', m2, autoload_with=testing.db, autoload=True) assert isinstance(t2.c.y.type, postgresql.OID) + class NumericInterpretationTest(fixtures.TestBase): __only_on__ = 'postgresql' + __backend__ = True def test_numeric_codes(self): from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base @@ -343,7 +385,7 @@ class NumericInterpretationTest(fixtures.TestBase): typ = Numeric().dialect_impl(dialect) for code in base._INT_TYPES + base._FLOAT_TYPES + \ - base._DECIMAL_TYPES: + base._DECIMAL_TYPES: proc = typ.result_processor(dialect, code) val = 23.7 if proc is not None: @@ -355,13 +397,13 @@ class NumericInterpretationTest(fixtures.TestBase): metadata = self.metadata # pg8000 appears to fail when the value is 0, # returns an int instead of decimal. - t =Table('t', metadata, - Column('id', Integer, primary_key=True), - Column('nd', Numeric(asdecimal=True), default=1), - Column('nf', Numeric(asdecimal=False), default=1), - Column('fd', Float(asdecimal=True), default=1), - Column('ff', Float(asdecimal=False), default=1), - ) + t = Table('t', metadata, + Column('id', Integer, primary_key=True), + Column('nd', Numeric(asdecimal=True), default=1), + Column('nf', Numeric(asdecimal=False), default=1), + Column('fd', Float(asdecimal=True), default=1), + Column('ff', Float(asdecimal=False), default=1), + ) metadata.create_all() r = t.insert().execute() @@ -375,7 +417,9 @@ class NumericInterpretationTest(fixtures.TestBase): (1, decimal.Decimal("1"), 1, decimal.Decimal("1"), 1) ) + class TimezoneTest(fixtures.TestBase): + __backend__ = True """Test timezone-aware datetimes. @@ -395,17 +439,24 @@ class TimezoneTest(fixtures.TestBase): # current_timestamp() in postgresql is assumed to return # TIMESTAMP WITH TIMEZONE - tztable = Table('tztable', metadata, Column('id', Integer, - primary_key=True), Column('date', - DateTime(timezone=True), - onupdate=func.current_timestamp()), - Column('name', String(20))) - notztable = Table('notztable', metadata, Column('id', Integer, - primary_key=True), Column('date', - DateTime(timezone=False), - onupdate=cast(func.current_timestamp(), - DateTime(timezone=False))), Column('name', - String(20))) + tztable = Table( + 'tztable', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'date', DateTime( + timezone=True), onupdate=func.current_timestamp()), + Column('name', String(20))) + notztable = Table( + 'notztable', metadata, + Column( + 'id', Integer, primary_key=True), + Column( + 'date', DateTime( + timezone=False), onupdate=cast( + func.current_timestamp(), DateTime( + timezone=False))), + Column('name', String(20))) metadata.create_all() @classmethod @@ -429,8 +480,8 @@ class TimezoneTest(fixtures.TestBase): row[0].tzinfo.utcoffset(row[0])) result = tztable.update(tztable.c.id == 1).returning(tztable.c.date).\ - execute(name='newname' - ) + execute(name='newname' + ) row = result.first() assert row[0] >= somedate @@ -438,7 +489,7 @@ class TimezoneTest(fixtures.TestBase): # get a date without a tzinfo - somedate = datetime.datetime( 2005, 10, 20, 11, 52, 0, ) + somedate = datetime.datetime(2005, 10, 20, 11, 52, 0, ) assert not somedate.tzinfo notztable.insert().execute(id=1, name='row1', date=somedate) row = select([notztable.c.date], notztable.c.id @@ -447,15 +498,17 @@ class TimezoneTest(fixtures.TestBase): eq_(row[0].tzinfo, None) result = notztable.update(notztable.c.id == 1).returning(notztable.c.date).\ - execute(name='newname' - ) + execute(name='newname' + ) row = result.first() assert row[0] >= somedate + class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = postgresql.dialect() __prefer__ = 'postgresql' + __backend__ = True def test_compile(self): for type_, expected in [ @@ -469,7 +522,7 @@ class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL): 'TIMESTAMP(5) WITHOUT TIME ZONE'), (postgresql.TIMESTAMP(timezone=True, precision=5), 'TIMESTAMP(5) WITH TIME ZONE'), - ]: + ]: self.assert_compile(type_, expected) @testing.only_on('postgresql', 'DB specific feature') @@ -485,8 +538,8 @@ class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL): Column('c4', postgresql.TIMESTAMP()), Column('c5', postgresql.TIMESTAMP(precision=5)), Column('c6', postgresql.TIMESTAMP(timezone=True, - precision=5)), - ) + precision=5)), + ) t1.create() m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) @@ -503,10 +556,11 @@ class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL): eq_(t2.c.c5.type.timezone, False) eq_(t2.c.c6.type.timezone, True) + class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): __only_on__ = 'postgresql' - + __backend__ = True __unsupported_on__ = 'postgresql+pg8000', 'postgresql+zxjdbc' @classmethod @@ -532,23 +586,23 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): ] Table('arrtable', metadata, - Column('id', Integer, primary_key=True), - Column('intarr', postgresql.ARRAY(Integer)), - Column('strarr', postgresql.ARRAY(Unicode())), - Column('dimarr', ProcValue) - ) + Column('id', Integer, primary_key=True), + Column('intarr', postgresql.ARRAY(Integer)), + Column('strarr', postgresql.ARRAY(Unicode())), + Column('dimarr', ProcValue) + ) Table('dim_arrtable', metadata, - Column('id', Integer, primary_key=True), - Column('intarr', postgresql.ARRAY(Integer, dimensions=1)), - Column('strarr', postgresql.ARRAY(Unicode(), dimensions=1)), - Column('dimarr', ProcValue) - ) + Column('id', Integer, primary_key=True), + Column('intarr', postgresql.ARRAY(Integer, dimensions=1)), + Column('strarr', postgresql.ARRAY(Unicode(), dimensions=1)), + Column('dimarr', ProcValue) + ) def _fixture_456(self, table): testing.db.execute( - table.insert(), - intarr=[4, 5, 6] + table.insert(), + intarr=[4, 5, 6] ) def test_reflect_array_column(self): @@ -562,7 +616,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_insert_array(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), - util.u('def')]) + util.u('def')]) results = arrtable.select().execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1, 2, 3]) @@ -571,9 +625,12 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_array_where(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), - util.u('def')]) + util.u('def')]) arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC')) - results = arrtable.select().where(arrtable.c.intarr == [1, 2, + results = arrtable.select().where( + arrtable.c.intarr == [ + 1, + 2, 3]).execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1, 2, 3]) @@ -581,24 +638,26 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_array_concat(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], - strarr=[util.u('abc'), util.u('def')]) + strarr=[util.u('abc'), util.u('def')]) results = select([arrtable.c.intarr + [4, 5, - 6]]).execute().fetchall() + 6]]).execute().fetchall() eq_(len(results), 1) - eq_(results[0][0], [ 1, 2, 3, 4, 5, 6, ]) + eq_(results[0][0], [1, 2, 3, 4, 5, 6, ]) def test_array_subtype_resultprocessor(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[4, 5, 6], strarr=[[util.ue('m\xe4\xe4')], [ - util.ue('m\xf6\xf6')]]) + util.ue('m\xf6\xf6')]]) arrtable.insert().execute(intarr=[1, 2, 3], strarr=[ - util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) + util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) results = \ arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall() eq_(len(results), 2) eq_(results[0]['strarr'], [util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) - eq_(results[1]['strarr'], [[util.ue('m\xe4\xe4')], [util.ue('m\xf6\xf6')]]) + eq_(results[1]['strarr'], + [[util.ue('m\xe4\xe4')], + [util.ue('m\xf6\xf6')]]) def test_array_literal(self): eq_( @@ -606,7 +665,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): select([ postgresql.array([1, 2]) + postgresql.array([3, 4, 5]) ]) - ), [1,2,3,4,5] + ), [1, 2, 3, 4, 5] ) def test_array_getitem_single_type(self): @@ -640,7 +699,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): eq_( testing.db.scalar( select([arrtable.c.intarr]). - where(arrtable.c.intarr.contains([])) + where(arrtable.c.intarr.contains([])) ), [4, 5, 6] ) @@ -664,14 +723,13 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): [7, 8] ) - def _test_undim_array_contains_typed_exec(self, struct): arrtable = self.tables.arrtable self._fixture_456(arrtable) eq_( testing.db.scalar( select([arrtable.c.intarr]). - where(arrtable.c.intarr.contains(struct([4, 5]))) + where(arrtable.c.intarr.contains(struct([4, 5]))) ), [4, 5, 6] ) @@ -684,7 +742,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_undim_array_contains_generator_exec(self): self._test_undim_array_contains_typed_exec( - lambda elem: (x for x in elem)) + lambda elem: (x for x in elem)) def _test_dim_array_contains_typed_exec(self, struct): dim_arrtable = self.tables.dim_arrtable @@ -692,7 +750,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): eq_( testing.db.scalar( select([dim_arrtable.c.intarr]). - where(dim_arrtable.c.intarr.contains(struct([4, 5]))) + where(dim_arrtable.c.intarr.contains(struct([4, 5]))) ), [4, 5, 6] ) @@ -704,7 +762,9 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): self._test_dim_array_contains_typed_exec(list) def test_dim_array_contains_generator_exec(self): - self._test_dim_array_contains_typed_exec(lambda elem: (x for x in elem)) + self._test_dim_array_contains_typed_exec( + lambda elem: ( + x for x in elem)) def test_array_contained_by_exec(self): arrtable = self.tables.arrtable @@ -730,7 +790,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): eq_( conn.scalar( select([arrtable.c.intarr]). - where(arrtable.c.intarr.overlap([7, 6])) + where(arrtable.c.intarr.overlap([7, 6])) ), [4, 5, 6] ) @@ -745,7 +805,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): eq_( conn.scalar( select([arrtable.c.intarr]). - where(postgresql.Any(5, arrtable.c.intarr)) + where(postgresql.Any(5, arrtable.c.intarr)) ), [4, 5, 6] ) @@ -760,26 +820,40 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): eq_( conn.scalar( select([arrtable.c.intarr]). - where(arrtable.c.intarr.all(4, operator=operators.le)) + where(arrtable.c.intarr.all(4, operator=operators.le)) ), [4, 5, 6] ) - @testing.provide_metadata def test_tuple_flag(self): metadata = self.metadata - t1 = Table('t1', metadata, + t1 = Table( + 't1', metadata, Column('id', Integer, primary_key=True), Column('data', postgresql.ARRAY(String(5), as_tuple=True)), - Column('data2', postgresql.ARRAY(Numeric(asdecimal=False), as_tuple=True)), + Column( + 'data2', + postgresql.ARRAY( + Numeric(asdecimal=False), as_tuple=True) + ) ) metadata.create_all() - testing.db.execute(t1.insert(), id=1, data=["1","2","3"], data2=[5.4, 5.6]) - testing.db.execute(t1.insert(), id=2, data=["4", "5", "6"], data2=[1.0]) + testing.db.execute( + t1.insert(), id=1, data=[ + "1", "2", "3"], data2=[ + 5.4, 5.6]) + testing.db.execute( + t1.insert(), + id=2, + data=[ + "4", + "5", + "6"], + data2=[1.0]) testing.db.execute(t1.insert(), id=3, data=[["4", "5"], ["6", "7"]], - data2=[[5.4, 5.6], [1.0, 1.1]]) + data2=[[5.4, 5.6], [1.0, 1.1]]) r = testing.db.execute(t1.select().order_by(t1.c.id)).fetchall() eq_( @@ -798,14 +872,16 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_dimension(self): arrtable = self.tables.arrtable - testing.db.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4,5, 6]]) + testing.db.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4, 5, 6]]) eq_( testing.db.scalar(select([arrtable.c.dimarr])), [[-1, 0, 1], [2, 3, 4]] ) + class TimestampTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'postgresql' + __backend__ = True def test_timestamp(self): engine = testing.db @@ -817,10 +893,12 @@ class TimestampTest(fixtures.TestBase, AssertsExecutionResults): class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): + """test DDL and reflection of PG-specific types """ __only_on__ = 'postgresql' __excluded_on__ = (('postgresql', '<', (8, 3, 0)),) + __backend__ = True @classmethod def setup_class(cls): @@ -830,17 +908,23 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): # create these types so that we can issue # special SQL92 INTERVAL syntax class y2m(types.UserDefinedType, postgresql.INTERVAL): + def get_col_spec(self): return "INTERVAL YEAR TO MONTH" class d2s(types.UserDefinedType, postgresql.INTERVAL): + def get_col_spec(self): return "INTERVAL DAY TO SECOND" - table = Table('sometable', metadata, - Column('id', postgresql.UUID, primary_key=True), - Column('flag', postgresql.BIT), - Column('bitstring', postgresql.BIT(4)), + table = Table( + 'sometable', metadata, + Column( + 'id', postgresql.UUID, primary_key=True), + Column( + 'flag', postgresql.BIT), + Column( + 'bitstring', postgresql.BIT(4)), Column('addr', postgresql.INET), Column('addr2', postgresql.MACADDR), Column('addr3', postgresql.CIDR), @@ -848,9 +932,9 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): Column('plain_interval', postgresql.INTERVAL), Column('year_interval', y2m()), Column('month_interval', d2s()), - Column('precision_interval', postgresql.INTERVAL(precision=3)), - Column('tsvector_document', postgresql.TSVECTOR) - ) + Column('precision_interval', postgresql.INTERVAL( + precision=3)), + Column('tsvector_document', postgresql.TSVECTOR)) metadata.create_all() @@ -877,7 +961,7 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): (postgresql.BIT(5), 'BIT(5)'), (postgresql.BIT(varying=True), 'BIT VARYING'), (postgresql.BIT(5, varying=True), 'BIT VARYING(5)'), - ] + ] for type_, expected in pairs: self.assert_compile(type_, expected) @@ -890,17 +974,18 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): testing.db.execute(t.update(), data="'a' 'cat' 'fat' 'mat' 'sat'") - eq_(testing.db.scalar(select([t.c.data])), "'a' 'cat' 'fat' 'mat' 'sat'") + eq_(testing.db.scalar(select([t.c.data])), + "'a' 'cat' 'fat' 'mat' 'sat'") @testing.provide_metadata def test_bit_reflection(self): metadata = self.metadata t1 = Table('t1', metadata, - Column('bit1', postgresql.BIT()), - Column('bit5', postgresql.BIT(5)), - Column('bitvarying', postgresql.BIT(varying=True)), - Column('bitvarying5', postgresql.BIT(5, varying=True)), - ) + Column('bit1', postgresql.BIT()), + Column('bit5', postgresql.BIT(5)), + Column('bitvarying', postgresql.BIT(varying=True)), + Column('bitvarying5', postgresql.BIT(5, varying=True)), + ) t1.create() m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) @@ -913,33 +998,40 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): eq_(t2.c.bitvarying5.type.length, 5) eq_(t2.c.bitvarying5.type.varying, True) + class UUIDTest(fixtures.TestBase): + """Test the bind/return values of the UUID type.""" __only_on__ = 'postgresql >= 8.3' + __backend__ = True - @testing.fails_on('postgresql+zxjdbc', - 'column "data" is of type uuid but expression is of type character varying') + @testing.fails_on( + 'postgresql+zxjdbc', + 'column "data" is of type uuid but expression ' + 'is of type character varying') @testing.fails_on('postgresql+pg8000', 'No support for UUID type') def test_uuid_string(self): import uuid self._test_round_trip( Table('utable', MetaData(), - Column('data', postgresql.UUID()) - ), + Column('data', postgresql.UUID()) + ), str(uuid.uuid4()), str(uuid.uuid4()) ) - @testing.fails_on('postgresql+zxjdbc', - 'column "data" is of type uuid but expression is of type character varying') + @testing.fails_on( + 'postgresql+zxjdbc', + 'column "data" is of type uuid but expression is ' + 'of type character varying') @testing.fails_on('postgresql+pg8000', 'No support for UUID type') def test_uuid_uuid(self): import uuid self._test_round_trip( Table('utable', MetaData(), - Column('data', postgresql.UUID(as_uuid=True)) - ), + Column('data', postgresql.UUID(as_uuid=True)) + ), uuid.uuid4(), uuid.uuid4() ) @@ -965,26 +1057,25 @@ class UUIDTest(fixtures.TestBase): def _test_round_trip(self, utable, value1, value2): utable.create(self.conn) - self.conn.execute(utable.insert(), {'data':value1}) - self.conn.execute(utable.insert(), {'data':value2}) + self.conn.execute(utable.insert(), {'data': value1}) + self.conn.execute(utable.insert(), {'data': value2}) r = self.conn.execute( - select([utable.c.data]). - where(utable.c.data != value1) - ) + select([utable.c.data]). + where(utable.c.data != value1) + ) eq_(r.fetchone()[0], value2) eq_(r.fetchone(), None) - class HStoreTest(AssertsCompiledSQL, fixtures.TestBase): __dialect__ = 'postgresql' def setup(self): metadata = MetaData() self.test_table = Table('test_table', metadata, - Column('id', Integer, primary_key=True), - Column('hash', HSTORE) - ) + Column('id', Integer, primary_key=True), + Column('hash', HSTORE) + ) self.hashcol = self.test_table.c.hash def _test_where(self, whereclause, expected): @@ -1025,20 +1116,20 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase): def test_parse_error(self): dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_result_processor( - dialect, None) + dialect, None) assert_raises_message( ValueError, r'''After u?'\[\.\.\.\], "key1"=>"value1", ', could not parse ''' '''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''', proc, '"key2"=>"value2", "key1"=>"value1", ' - 'crapcrapcrap, "key3"=>"value3"' + 'crapcrapcrap, "key3"=>"value3"' ) def test_result_deserialize_default(self): dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_result_processor( - dialect, None) + dialect, None) eq_( proc('"key2"=>"value2", "key1"=>"value1"'), {"key1": "value1", "key2": "value2"} @@ -1047,7 +1138,7 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase): def test_result_deserialize_with_slashes_and_quotes(self): dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_result_processor( - dialect, None) + dialect, None) eq_( proc('"\\\\\\"a"=>"\\\\\\"1"'), {'\\"a': '\\"1'} @@ -1075,13 +1166,13 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase): dialect = psycopg2.PGDialect_psycopg2() dialect._has_native_hstore = True proc = self.test_table.c.hash.type._cached_result_processor( - dialect, None) + dialect, None) is_(proc, None) dialect = psycopg2.PGDialect_psycopg2() dialect._has_native_hstore = False proc = self.test_table.c.hash.type._cached_result_processor( - dialect, None) + dialect, None) eq_( proc('"key2"=>"value2", "key1"=>"value1"'), {"key1": "value1", "key2": "value2"} @@ -1246,30 +1337,31 @@ class HStoreTest(AssertsCompiledSQL, fixtures.TestBase): class HStoreRoundTripTest(fixtures.TablesTest): __requires__ = 'hstore', __dialect__ = 'postgresql' + __backend__ = True @classmethod def define_tables(cls, metadata): Table('data_table', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(30), nullable=False), - Column('data', HSTORE) - ) + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', HSTORE) + ) def _fixture_data(self, engine): data_table = self.tables.data_table engine.execute( - data_table.insert(), - {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, - {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, - {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, - {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, - {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}}, + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}}, ) def _assert_data(self, compare): data = testing.db.execute( select([self.tables.data_table.c.data]). - order_by(self.tables.data_table.c.name) + order_by(self.tables.data_table.c.name) ).fetchall() eq_([d for d, in data], compare) @@ -1282,7 +1374,9 @@ class HStoreRoundTripTest(fixtures.TablesTest): def _non_native_engine(self): if testing.against("postgresql+psycopg2"): - engine = engines.testing_engine(options=dict(use_native_hstore=False)) + engine = engines.testing_engine( + options=dict( + use_native_hstore=False)) else: engine = testing.db engine.connect() @@ -1316,17 +1410,17 @@ class HStoreRoundTripTest(fixtures.TablesTest): def _test_criterion(self, engine): data_table = self.tables.data_table result = engine.execute( - select([data_table.c.data]).where(data_table.c.data['k1'] == 'r3v1') - ).first() + select([data_table.c.data]).where( + data_table.c.data['k1'] == 'r3v1')).first() eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) def _test_fixed_round_trip(self, engine): s = select([ - hstore( - array(['key1', 'key2', 'key3']), - array(['value1', 'value2', 'value3']) - ) - ]) + hstore( + array(['key1', 'key2', 'key3']), + array(['value1', 'value2', 'value3']) + ) + ]) eq_( engine.scalar(s), {"key1": "value1", "key2": "value2", "key3": "value3"} @@ -1357,6 +1451,7 @@ class HStoreRoundTripTest(fixtures.TablesTest): } ) + @testing.only_on("postgresql+psycopg2") def test_unicode_round_trip_python(self): engine = self._non_native_engine() self._test_unicode_round_trip(engine) @@ -1384,22 +1479,27 @@ class HStoreRoundTripTest(fixtures.TablesTest): def test_orm_round_trip(self): from sqlalchemy import orm + class Data(object): + def __init__(self, name, data): self.name = name self.data = data orm.mapper(Data, self.tables.data_table) s = orm.Session(testing.db) d = Data(name='r1', data={"key1": "value1", "key2": "value2", - "key3": "value3"}) + "key3": "value3"}) s.add(d) eq_( s.query(Data.data, Data).all(), [(d.data, d)] ) + + class _RangeTypeMixin(object): __requires__ = 'range_types', __dialect__ = 'postgresql+psycopg2' + __backend__ = True def extras(self): # done this way so we don't get ImportErrors with @@ -1412,8 +1512,8 @@ class _RangeTypeMixin(object): # no reason ranges shouldn't be primary keys, # so lets just use them as such table = Table('data_table', metadata, - Column('range', cls._col_type, primary_key=True), - ) + Column('range', cls._col_type, primary_key=True), + ) cls.col = table.c.range def test_actual_type(self): @@ -1454,13 +1554,13 @@ class _RangeTypeMixin(object): def test_where_equal(self): self._test_clause( - self.col==self._data_str, + self.col == self._data_str, "data_table.range = %(range_1)s" ) def test_where_not_equal(self): self._test_clause( - self.col!=self._data_str, + self.col != self._data_str, "data_table.range <> %(range_1)s" ) @@ -1560,10 +1660,9 @@ class _RangeTypeMixin(object): range = self.tables.data_table.c.range data = testing.db.execute( select([range + range]) - ).fetchall() + ).fetchall() eq_(data, [(self._data_obj(), )]) - def test_intersection(self): self._test_clause( self.col * self.col, @@ -1580,7 +1679,7 @@ class _RangeTypeMixin(object): range = self.tables.data_table.c.range data = testing.db.execute( select([range * range]) - ).fetchall() + ).fetchall() eq_(data, [(self._data_obj(), )]) def test_different(self): @@ -1599,57 +1698,68 @@ class _RangeTypeMixin(object): range = self.tables.data_table.c.range data = testing.db.execute( select([range - range]) - ).fetchall() + ).fetchall() eq_(data, [(self._data_obj().__class__(empty=True), )]) + class Int4RangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = INT4RANGE _col_str = 'INT4RANGE' _data_str = '[1,2)' + def _data_obj(self): return self.extras().NumericRange(1, 2) + class Int8RangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = INT8RANGE _col_str = 'INT8RANGE' _data_str = '[9223372036854775806,9223372036854775807)' + def _data_obj(self): return self.extras().NumericRange( 9223372036854775806, 9223372036854775807 - ) + ) + class NumRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = NUMRANGE _col_str = 'NUMRANGE' _data_str = '[1.0,2.0)' + def _data_obj(self): return self.extras().NumericRange( decimal.Decimal('1.0'), decimal.Decimal('2.0') - ) + ) + class DateRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = DATERANGE _col_str = 'DATERANGE' _data_str = '[2013-03-23,2013-03-24)' + def _data_obj(self): return self.extras().DateRange( datetime.date(2013, 3, 23), datetime.date(2013, 3, 24) - ) + ) + class DateTimeRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = TSRANGE _col_str = 'TSRANGE' _data_str = '[2013-03-23 14:30,2013-03-23 23:30)' + def _data_obj(self): return self.extras().DateTimeRange( datetime.datetime(2013, 3, 23, 14, 30), datetime.datetime(2013, 3, 23, 23, 30) - ) + ) + class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest): @@ -1659,12 +1769,13 @@ class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest): # make sure we use one, steady timestamp with timezone pair # for all parts of all these tests _tstzs = None + def tstzs(self): if self._tstzs is None: lower = testing.db.connect().scalar( func.current_timestamp().select() - ) - upper = lower+datetime.timedelta(1) + ) + upper = lower + datetime.timedelta(1) self._tstzs = (lower, upper) return self._tstzs @@ -1682,9 +1793,9 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase): def setup(self): metadata = MetaData() self.test_table = Table('test_table', metadata, - Column('id', Integer, primary_key=True), - Column('test_column', JSON) - ) + Column('id', Integer, primary_key=True), + Column('test_column', JSON) + ) self.jsoncol = self.test_table.c.test_column def _test_where(self, whereclause, expected): @@ -1707,7 +1818,8 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase): def test_bind_serialize_default(self): dialect = postgresql.dialect() - proc = self.test_table.c.test_column.type._cached_bind_processor(dialect) + proc = self.test_table.c.test_column.type._cached_bind_processor( + dialect) eq_( proc({"A": [1, 2, 3, True, False]}), '{"A": [1, 2, 3, true, false]}' @@ -1716,13 +1828,14 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase): def test_result_deserialize_default(self): dialect = postgresql.dialect() proc = self.test_table.c.test_column.type._cached_result_processor( - dialect, None) + dialect, None) eq_( proc('{"A": [1, 2, 3, true, false]}'), {"A": [1, 2, 3, True, False]} ) - # This test is a bit misleading -- in real life you will need to cast to do anything + # This test is a bit misleading -- in real life you will need to cast to + # do anything def test_where_getitem(self): self._test_where( self.jsoncol['bar'] == None, @@ -1764,30 +1877,31 @@ class JSONTest(AssertsCompiledSQL, fixtures.TestBase): class JSONRoundTripTest(fixtures.TablesTest): __only_on__ = ('postgresql >= 9.3',) + __backend__ = True @classmethod def define_tables(cls, metadata): Table('data_table', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(30), nullable=False), - Column('data', JSON) - ) + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', JSON) + ) def _fixture_data(self, engine): data_table = self.tables.data_table engine.execute( - data_table.insert(), - {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, - {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, - {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, - {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, - {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}}, + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}}, ) def _assert_data(self, compare): data = testing.db.execute( select([self.tables.data_table.c.data]). - order_by(self.tables.data_table.c.name) + order_by(self.tables.data_table.c.name) ).fetchall() eq_([d for d, in data], compare) @@ -1810,9 +1924,11 @@ class JSONRoundTripTest(fixtures.TablesTest): if testing.against("postgresql+psycopg2"): from psycopg2.extras import register_default_json engine = engines.testing_engine(options=options) + @event.listens_for(engine, "connect") def connect(dbapi_connection, connection_record): engine.dialect._has_native_json = False + def pass_(value): return value register_default_json(dbapi_connection, loads=pass_) @@ -1837,9 +1953,9 @@ class JSONRoundTripTest(fixtures.TablesTest): engine = self._non_native_engine() self._test_insert(engine) - def _test_custom_serialize_deserialize(self, native): import json + def loads(value): value = json.loads(value) value['x'] = value['x'] + '_loads' @@ -1852,24 +1968,24 @@ class JSONRoundTripTest(fixtures.TablesTest): if native: engine = engines.testing_engine(options=dict( - json_serializer=dumps, - json_deserializer=loads - )) + json_serializer=dumps, + json_deserializer=loads + )) else: engine = self._non_native_engine( - json_serializer=dumps, - json_deserializer=loads - ) + json_serializer=dumps, + json_deserializer=loads + ) s = select([ - cast( - { - "key": "value", - "x": "q" - }, - JSON - ) - ]) + cast( + { + "key": "value", + "x": "q" + }, + JSON + ) + ]) eq_( engine.scalar(s), { @@ -1886,7 +2002,6 @@ class JSONRoundTripTest(fixtures.TablesTest): def test_custom_python(self): self._test_custom_serialize_deserialize(False) - @testing.only_on("postgresql+psycopg2") def test_criterion_native(self): engine = testing.db @@ -1924,7 +2039,7 @@ class JSONRoundTripTest(fixtures.TablesTest): data_table = self.tables.data_table result = engine.execute( select([data_table.c.data['k3'].cast(Integer)]).where( - data_table.c.name == 'r5') + data_table.c.name == 'r5') ).first() assert isinstance(result[0], int) @@ -1939,14 +2054,14 @@ class JSONRoundTripTest(fixtures.TablesTest): def _test_fixed_round_trip(self, engine): s = select([ - cast( - { - "key": "value", - "key2": {"k1": "v1", "k2": "v2"} - }, - JSON - ) - ]) + cast( + { + "key": "value", + "key2": {"k1": "v1", "k2": "v2"} + }, + JSON + ) + ]) eq_( engine.scalar(s), { @@ -1976,10 +2091,10 @@ class JSONRoundTripTest(fixtures.TablesTest): ]) eq_( engine.scalar(s), - { - util.u('réveillé'): util.u('réveillé'), - "data": {"k1": util.u('drôle')} - }, + { + util.u('réveillé'): util.u('réveillé'), + "data": {"k1": util.u('drôle')} + }, ) def test_unicode_round_trip_python(self): @@ -1991,16 +2106,18 @@ class JSONRoundTripTest(fixtures.TablesTest): engine = testing.db self._test_unicode_round_trip(engine) + class JSONBTest(JSONTest): + def setup(self): metadata = MetaData() self.test_table = Table('test_table', metadata, - Column('id', Integer, primary_key=True), - Column('test_column', JSONB) - ) + Column('id', Integer, primary_key=True), + Column('test_column', JSONB) + ) self.jsoncol = self.test_table.c.test_column - #Note - add fixture data for arrays [] + # Note - add fixture data for arrays [] def test_where_has_key(self): self._test_where( @@ -2011,9 +2128,9 @@ class JSONBTest(JSONTest): def test_where_has_all(self): self._test_where( - self.jsoncol.has_all({'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}), - "test_table.test_column ?& %(test_column_1)s" - ) + self.jsoncol.has_all( + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}), + "test_table.test_column ?& %(test_column_1)s") def test_where_has_any(self): self._test_where( @@ -2036,5 +2153,3 @@ class JSONBTest(JSONTest): class JSONBRoundTripTest(JSONRoundTripTest): __only_on__ = ('postgresql >= 9.4',) - - |