diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
| commit | 1c23741b8e045d266d0ecbed975952547444a5fa (patch) | |
| tree | 366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/postgresql | |
| parent | 83f3dbc83d1066216084a01b32cddcc090f697d5 (diff) | |
| download | sqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz | |
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/postgresql')
| -rw-r--r-- | test/dialect/postgresql/__init__.py | 0 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_compiler.py | 589 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 221 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_query.py | 723 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_reflection.py | 460 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_types.py | 1679 |
6 files changed, 3672 insertions, 0 deletions
diff --git a/test/dialect/postgresql/__init__.py b/test/dialect/postgresql/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/test/dialect/postgresql/__init__.py diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py new file mode 100644 index 000000000..a79c0e7de --- /dev/null +++ b/test/dialect/postgresql/test_compiler.py @@ -0,0 +1,589 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, assert_raises +from sqlalchemy.testing import engines, fixtures +from sqlalchemy import testing +import datetime +from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ + String, Sequence, ForeignKey, join, Numeric, \ + PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ + func, literal_column, literal, bindparam, cast, extract, \ + SmallInteger, Enum, REAL, update, insert, Index, delete, \ + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy.dialects.postgresql import ExcludeConstraint, array +from sqlalchemy import exc, schema +from sqlalchemy.dialects.postgresql import base as postgresql +from sqlalchemy.dialects.postgresql import TSRANGE +from sqlalchemy.orm import mapper, aliased, Session +from sqlalchemy.sql import table, column, operators + +class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): + + def test_format(self): + seq = Sequence('my_seq_no_schema') + dialect = postgresql.PGDialect() + assert dialect.identifier_preparer.format_sequence(seq) \ + == 'my_seq_no_schema' + seq = Sequence('my_seq', schema='some_schema') + assert dialect.identifier_preparer.format_sequence(seq) \ + == 'some_schema.my_seq' + seq = Sequence('My_Seq', schema='Some_Schema') + assert dialect.identifier_preparer.format_sequence(seq) \ + == '"Some_Schema"."My_Seq"' + + @testing.only_on('postgresql', 'foo') + @testing.provide_metadata + def test_reverse_eng_name(self): + metadata = self.metadata + engine = engines.testing_engine(options=dict(implicit_returning=False)) + for tname, cname in [ + ('tb1' * 30, 'abc'), + ('tb2', 'abc' * 30), + ('tb3' * 30, 'abc' * 30), + ('tb4', 'abc'), + ]: + t = Table(tname[:57], + metadata, + Column(cname[:57], Integer, primary_key=True) + ) + t.create(engine) + r = engine.execute(t.insert()) + assert r.inserted_primary_key == [1] + +class CompileTest(fixtures.TestBase, AssertsCompiledSQL): + + __dialect__ = postgresql.dialect() + + def test_update_returning(self): + dialect = postgresql.dialect() + table1 = table('mytable', column('myid', Integer), column('name' + , String(128)), column('description', + String(128))) + u = update(table1, values=dict(name='foo' + )).returning(table1.c.myid, table1.c.name) + self.assert_compile(u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING mytable.myid, mytable.name', + dialect=dialect) + u = update(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING mytable.myid, mytable.name, ' + 'mytable.description', dialect=dialect) + u = update(table1, values=dict(name='foo' + )).returning(func.length(table1.c.name)) + self.assert_compile(u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING length(mytable.name) AS length_1' + , dialect=dialect) + + + def test_insert_returning(self): + dialect = postgresql.dialect() + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + i = insert(table1, values=dict(name='foo' + )).returning(table1.c.myid, table1.c.name) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) RETURNING mytable.myid, ' + 'mytable.name', dialect=dialect) + i = insert(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) RETURNING mytable.myid, ' + 'mytable.name, mytable.description', + dialect=dialect) + i = insert(table1, values=dict(name='foo' + )).returning(func.length(table1.c.name)) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) RETURNING length(mytable.name) ' + 'AS length_1', dialect=dialect) + + + def test_create_partial_index(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', Integer)) + idx = Index('test_idx1', tbl.c.data, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data + < 10)) + idx = Index('test_idx1', tbl.c.data, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data + < 10)) + + # test quoting and all that + + idx2 = Index('test_idx2', tbl.c.data, + postgresql_where=and_(tbl.c.data > 'a', tbl.c.data + < "b's")) + self.assert_compile(schema.CreateIndex(idx), + 'CREATE INDEX test_idx1 ON testtbl (data) ' + 'WHERE data > 5 AND data < 10', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl (data) " + "WHERE data > 'a' AND data < 'b''s'", + dialect=postgresql.dialect()) + + def test_create_index_with_ops(self): + m = MetaData() + tbl = Table('testtbl', m, + Column('data', String), + Column('data2', Integer, key='d2')) + + idx = Index('test_idx1', tbl.c.data, + postgresql_ops={'data': 'text_pattern_ops'}) + + idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2, + postgresql_ops={'data': 'text_pattern_ops', + 'd2': 'int4_ops'}) + + self.assert_compile(schema.CreateIndex(idx), + 'CREATE INDEX test_idx1 ON testtbl ' + '(data text_pattern_ops)', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx2), + 'CREATE INDEX test_idx2 ON testtbl ' + '(data text_pattern_ops, data2 int4_ops)', + dialect=postgresql.dialect()) + + def test_create_index_with_using(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', String)) + + idx1 = Index('test_idx1', tbl.c.data) + idx2 = Index('test_idx2', tbl.c.data, postgresql_using='btree') + idx3 = Index('test_idx3', tbl.c.data, postgresql_using='hash') + + self.assert_compile(schema.CreateIndex(idx1), + 'CREATE INDEX test_idx1 ON testtbl ' + '(data)', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx2), + 'CREATE INDEX test_idx2 ON testtbl ' + 'USING btree (data)', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx3), + 'CREATE INDEX test_idx3 ON testtbl ' + 'USING hash (data)', + dialect=postgresql.dialect()) + + def test_exclude_constraint_min(self): + m = MetaData() + tbl = Table('testtbl', m, + Column('room', Integer, primary_key=True)) + cons = ExcludeConstraint(('room', '=')) + tbl.append_constraint(cons) + self.assert_compile(schema.AddConstraint(cons), + 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' + '(room WITH =)', + dialect=postgresql.dialect()) + + def test_exclude_constraint_full(self): + m = MetaData() + room = Column('room', Integer, primary_key=True) + tbl = Table('testtbl', m, + room, + Column('during', TSRANGE)) + room = Column('room', Integer, primary_key=True) + cons = ExcludeConstraint((room, '='), ('during', '&&'), + name='my_name', + using='gist', + where="room > 100", + deferrable=True, + initially='immediate') + tbl.append_constraint(cons) + self.assert_compile(schema.AddConstraint(cons), + 'ALTER TABLE testtbl ADD CONSTRAINT my_name ' + 'EXCLUDE USING gist ' + '(room WITH =, during WITH ''&&) WHERE ' + '(room > 100) DEFERRABLE INITIALLY immediate', + dialect=postgresql.dialect()) + + def test_exclude_constraint_copy(self): + m = MetaData() + cons = ExcludeConstraint(('room', '=')) + tbl = Table('testtbl', m, + Column('room', Integer, primary_key=True), + cons) + # apparently you can't copy a ColumnCollectionConstraint until + # after it has been bound to a table... + cons_copy = cons.copy() + tbl.append_constraint(cons_copy) + self.assert_compile(schema.AddConstraint(cons_copy), + 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' + '(room WITH =)', + dialect=postgresql.dialect()) + + def test_substring(self): + self.assert_compile(func.substring('abc', 1, 2), + 'SUBSTRING(%(substring_1)s FROM %(substring_2)s ' + 'FOR %(substring_3)s)') + self.assert_compile(func.substring('abc', 1), + 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)') + + + + def test_extract(self): + t = table('t', column('col1', DateTime), column('col2', Date), + column('col3', Time), column('col4', + postgresql.INTERVAL)) + for field in 'year', 'month', 'day', 'epoch', 'hour': + for expr, compiled_expr in [ # invalid, no cast. plain + # text. no cast. addition is + # commutative subtraction is + # not invalid - no cast. dont + # crack up on entirely + # unsupported types + (t.c.col1, 't.col1 :: timestamp'), + (t.c.col2, 't.col2 :: date'), + (t.c.col3, 't.col3 :: time'), + (func.current_timestamp() - datetime.timedelta(days=5), + '(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: ' + 'timestamp'), + (func.current_timestamp() + func.current_timestamp(), + 'CURRENT_TIMESTAMP + CURRENT_TIMESTAMP'), + (text('foo.date + foo.time'), 'foo.date + foo.time'), + (func.current_timestamp() + datetime.timedelta(days=5), + '(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: ' + 'timestamp'), + (t.c.col2 + t.c.col3, '(t.col2 + t.col3) :: timestamp' + ), + (t.c.col2 + datetime.timedelta(days=5), + '(t.col2 + %(col2_1)s) :: timestamp'), + (datetime.timedelta(days=5) + t.c.col2, + '(%(col2_1)s + t.col2) :: timestamp'), + (t.c.col1 + t.c.col4, '(t.col1 + t.col4) :: timestamp' + ), + (t.c.col1 - datetime.timedelta(seconds=30), + '(t.col1 - %(col1_1)s) :: timestamp'), + (datetime.timedelta(seconds=30) - t.c.col1, + '%(col1_1)s - t.col1'), + (func.coalesce(t.c.col1, func.current_timestamp()), + 'coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp'), + (t.c.col3 + datetime.timedelta(seconds=30), + '(t.col3 + %(col3_1)s) :: time'), + (func.current_timestamp() - func.coalesce(t.c.col1, + func.current_timestamp()), + '(CURRENT_TIMESTAMP - coalesce(t.col1, ' + 'CURRENT_TIMESTAMP)) :: interval'), + (3 * func.foobar(type_=Interval), + '(%(foobar_1)s * foobar()) :: interval'), + (literal(datetime.timedelta(seconds=10)) + - literal(datetime.timedelta(seconds=10)), + '(%(param_1)s - %(param_2)s) :: interval'), + (t.c.col3 + 'some string', 't.col3 + %(col3_1)s'), + ]: + self.assert_compile(select([extract(field, + expr)]).select_from(t), + 'SELECT EXTRACT(%s FROM %s) AS ' + 'anon_1 FROM t' % (field, + compiled_expr)) + + def test_reserved_words(self): + table = Table("pg_table", MetaData(), + Column("col1", Integer), + Column("variadic", Integer)) + x = select([table.c.col1, table.c.variadic]) + + self.assert_compile(x, + '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''') + + def test_array(self): + c = Column('x', postgresql.ARRAY(Integer)) + + self.assert_compile( + cast(c, postgresql.ARRAY(Integer)), + "CAST(x AS INTEGER[])" + ) + self.assert_compile( + c[5], + "x[%(x_1)s]", + checkparams={'x_1': 5} + ) + + self.assert_compile( + c[5:7], + "x[%(x_1)s:%(x_2)s]", + checkparams={'x_2': 7, 'x_1': 5} + ) + self.assert_compile( + c[5:7][2:3], + "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", + checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3} + ) + self.assert_compile( + c[5:7][3], + "x[%(x_1)s:%(x_2)s][%(param_1)s]", + checkparams={'x_2': 7, 'x_1': 5, 'param_1':3} + ) + + self.assert_compile( + c.contains([1]), + 'x @> %(x_1)s', + checkparams={'x_1': [1]} + ) + self.assert_compile( + c.contained_by([2]), + 'x <@ %(x_1)s', + checkparams={'x_1': [2]} + ) + self.assert_compile( + c.overlap([3]), + 'x && %(x_1)s', + checkparams={'x_1': [3]} + ) + self.assert_compile( + postgresql.Any(4, c), + '%(param_1)s = ANY (x)', + checkparams={'param_1': 4} + ) + self.assert_compile( + c.any(5, operator=operators.ne), + '%(param_1)s != ANY (x)', + checkparams={'param_1': 5} + ) + self.assert_compile( + postgresql.All(6, c, operator=operators.gt), + '%(param_1)s > ALL (x)', + checkparams={'param_1': 6} + ) + self.assert_compile( + c.all(7, operator=operators.lt), + '%(param_1)s < ALL (x)', + checkparams={'param_1': 7} + ) + + def test_array_literal_type(self): + is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY) + is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer) + + is_(postgresql.array([1, 2], type_=String). + type.item_type._type_affinity, String) + + def test_array_literal(self): + self.assert_compile( + func.array_dims(postgresql.array([1, 2]) + + postgresql.array([3, 4, 5])), + "array_dims(ARRAY[%(param_1)s, %(param_2)s] || " + "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", + checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1, + 'param_3': 3, 'param_2': 2} + ) + + def test_array_literal_insert(self): + m = MetaData() + t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + self.assert_compile( + t.insert().values(data=array([1, 2, 3])), + "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " + "%(param_2)s, %(param_3)s])" + ) + + def test_update_array_element(self): + m = MetaData() + t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + self.assert_compile( + t.update().values({t.c.data[5]: 1}), + "UPDATE t SET data[%(data_1)s]=%(param_1)s", + checkparams={'data_1': 5, 'param_1': 1} + ) + + def test_update_array_slice(self): + m = MetaData() + t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + self.assert_compile( + t.update().values({t.c.data[2:5]: 2}), + "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s", + checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2} + + ) + + def test_from_only(self): + m = MetaData() + tbl1 = Table('testtbl1', m, Column('id', Integer)) + tbl2 = Table('testtbl2', m, Column('id', Integer)) + + stmt = tbl1.select().with_hint(tbl1, 'ONLY', 'postgresql') + expected = 'SELECT testtbl1.id FROM ONLY testtbl1' + self.assert_compile(stmt, expected) + + talias1 = tbl1.alias('foo') + stmt = talias1.select().with_hint(talias1, 'ONLY', 'postgresql') + expected = 'SELECT foo.id FROM ONLY testtbl1 AS foo' + self.assert_compile(stmt, expected) + + stmt = select([tbl1, tbl2]).with_hint(tbl1, 'ONLY', 'postgresql') + expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' + 'testtbl2') + self.assert_compile(stmt, expected) + + stmt = select([tbl1, tbl2]).with_hint(tbl2, 'ONLY', 'postgresql') + expected = ('SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY ' + 'testtbl2') + self.assert_compile(stmt, expected) + + stmt = select([tbl1, tbl2]) + stmt = stmt.with_hint(tbl1, 'ONLY', 'postgresql') + stmt = stmt.with_hint(tbl2, 'ONLY', 'postgresql') + expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' + 'ONLY testtbl2') + self.assert_compile(stmt, expected) + + stmt = update(tbl1, values=dict(id=1)) + stmt = stmt.with_hint('ONLY', dialect_name='postgresql') + expected = 'UPDATE ONLY testtbl1 SET id=%(id)s' + self.assert_compile(stmt, expected) + + stmt = delete(tbl1).with_hint('ONLY', selectable=tbl1, dialect_name='postgresql') + expected = 'DELETE FROM ONLY testtbl1' + self.assert_compile(stmt, expected) + + tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema') + stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql') + expected = 'SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3' + self.assert_compile(stmt, expected) + + assert_raises( + exc.CompileError, + tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile, + dialect=postgresql.dialect() + ) + + + +class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): + """Test 'DISTINCT' with SQL expression language and orm.Query with + an emphasis on PG's 'DISTINCT ON' syntax. + + """ + __dialect__ = postgresql.dialect() + + def setup(self): + self.table = Table('t', MetaData(), + Column('id',Integer, primary_key=True), + Column('a', String), + Column('b', String), + ) + + def test_plain_generative(self): + self.assert_compile( + select([self.table]).distinct(), + "SELECT DISTINCT t.id, t.a, t.b FROM t" + ) + + def test_on_columns_generative(self): + self.assert_compile( + select([self.table]).distinct(self.table.c.a), + "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" + ) + + def test_on_columns_generative_multi_call(self): + self.assert_compile( + select([self.table]).distinct(self.table.c.a). + distinct(self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t" + ) + + def test_plain_inline(self): + self.assert_compile( + select([self.table], distinct=True), + "SELECT DISTINCT t.id, t.a, t.b FROM t" + ) + + def test_on_columns_inline_list(self): + self.assert_compile( + select([self.table], + distinct=[self.table.c.a, self.table.c.b]). + order_by(self.table.c.a, self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id, " + "t.a, t.b FROM t ORDER BY t.a, t.b" + ) + + def test_on_columns_inline_scalar(self): + self.assert_compile( + select([self.table], distinct=self.table.c.a), + "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" + ) + + def test_query_plain(self): + sess = Session() + self.assert_compile( + sess.query(self.table).distinct(), + "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t" + ) + + def test_query_on_columns(self): + sess = Session() + self.assert_compile( + sess.query(self.table).distinct(self.table.c.a), + "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t" + ) + + def test_query_on_columns_multi_call(self): + sess = Session() + self.assert_compile( + sess.query(self.table).distinct(self.table.c.a). + distinct(self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t" + ) + + def test_query_on_columns_subquery(self): + sess = Session() + class Foo(object): + pass + mapper(Foo, self.table) + sess = Session() + self.assert_compile( + sess.query(Foo).from_self().distinct(Foo.a, Foo.b), + "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id " + "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b " + "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t) AS anon_1" + ) + + def test_query_distinct_on_aliased(self): + class Foo(object): + pass + mapper(Foo, self.table) + a1 = aliased(Foo) + sess = Session() + self.assert_compile( + sess.query(a1).distinct(a1.a), + "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, " + "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1" + ) + + def test_distinct_on_subquery_anon(self): + + sq = select([self.table]).alias() + q = select([self.table.c.id,sq.c.id]).\ + distinct(sq.c.id).\ + where(self.table.c.id==sq.c.id) + + self.assert_compile( + q, + "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id " + "FROM t, (SELECT t.id AS id, t.a AS a, t.b " + "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id" + ) + + def test_distinct_on_subquery_named(self): + sq = select([self.table]).alias('sq') + q = select([self.table.c.id,sq.c.id]).\ + distinct(sq.c.id).\ + where(self.table.c.id==sq.c.id) + self.assert_compile( + q, + "SELECT DISTINCT ON (sq.id) t.id, sq.id " + "FROM t, (SELECT t.id AS id, t.a AS a, " + "t.b AS b FROM t) AS sq WHERE t.id = sq.id" + ) diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py new file mode 100644 index 000000000..86ce91dc9 --- /dev/null +++ b/test/dialect/postgresql/test_dialect.py @@ -0,0 +1,221 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import eq_, assert_raises, \ + assert_raises_message, AssertsExecutionResults, \ + AssertsCompiledSQL +from sqlalchemy.testing import engines, fixtures +from sqlalchemy import testing +import datetime +from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ + String, Sequence, ForeignKey, join, Numeric, \ + PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ + func, literal_column, literal, bindparam, cast, extract, \ + SmallInteger, Enum, REAL, update, insert, Index, delete, \ + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy import exc, schema +from sqlalchemy.dialects.postgresql import base as postgresql +import logging +import logging.handlers + +class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): + + __only_on__ = 'postgresql' + + @testing.provide_metadata + def test_date_reflection(self): + metadata = self.metadata + t1 = Table('pgdate', metadata, Column('date1', + DateTime(timezone=True)), Column('date2', + DateTime(timezone=False))) + metadata.create_all() + m2 = MetaData(testing.db) + t2 = Table('pgdate', m2, autoload=True) + assert t2.c.date1.type.timezone is True + assert t2.c.date2.type.timezone is False + + @testing.fails_on('+zxjdbc', + 'The JDBC driver handles the version parsing') + def test_version_parsing(self): + + + class MockConn(object): + + def __init__(self, res): + self.res = res + + def execute(self, str): + return self + + def scalar(self): + return self.res + + + for string, version in \ + [('PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by ' + 'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)', (8, 3, + 8)), + ('PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, ' + 'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)), + ('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, ' + 'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), ' + '64-bit', (9, 1, 2))]: + eq_(testing.db.dialect._get_server_version_info(MockConn(string)), + version) + + @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') + def test_psycopg2_version(self): + v = testing.db.dialect.psycopg2_version + assert testing.db.dialect.dbapi.__version__.\ + startswith(".".join(str(x) for x in v)) + + @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') + def test_notice_logging(self): + log = logging.getLogger('sqlalchemy.dialects.postgresql') + buf = logging.handlers.BufferingHandler(100) + lev = log.level + log.addHandler(buf) + log.setLevel(logging.INFO) + try: + conn = testing.db.connect() + trans = conn.begin() + try: + conn.execute('create table foo (id serial primary key)') + finally: + trans.rollback() + finally: + log.removeHandler(buf) + log.setLevel(lev) + msgs = ' '.join(b.msg for b in buf.buffer) + assert 'will create implicit sequence' in msgs + assert 'will create implicit index' in msgs + + @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') + @engines.close_open_connections + def test_client_encoding(self): + c = testing.db.connect() + current_encoding = c.connection.connection.encoding + c.close() + + # attempt to use an encoding that's not + # already set + if current_encoding == 'UTF8': + test_encoding = 'LATIN1' + else: + test_encoding = 'UTF8' + + e = engines.testing_engine( + options={'client_encoding':test_encoding} + ) + c = e.connect() + eq_(c.connection.connection.encoding, test_encoding) + + @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') + @engines.close_open_connections + def test_autocommit_isolation_level(self): + extensions = __import__('psycopg2.extensions').extensions + + c = testing.db.connect() + c = c.execution_options(isolation_level='AUTOCOMMIT') + eq_(c.connection.connection.isolation_level, + extensions.ISOLATION_LEVEL_AUTOCOMMIT) + + @testing.fails_on('+zxjdbc', + "Can't infer the SQL type to use for an instance " + "of org.python.core.PyObjectDerived.") + @testing.fails_on('+pg8000', "Can't determine correct type.") + def test_extract(self): + fivedaysago = datetime.datetime.now() \ + - datetime.timedelta(days=5) + for field, exp in ('year', fivedaysago.year), ('month', + fivedaysago.month), ('day', fivedaysago.day): + r = testing.db.execute(select([extract(field, func.now() + + datetime.timedelta(days=-5))])).scalar() + eq_(r, exp) + + def test_checksfor_sequence(self): + meta1 = MetaData(testing.db) + seq = Sequence('fooseq') + t = Table('mytable', meta1, Column('col1', Integer, + seq)) + seq.drop() + try: + testing.db.execute('CREATE SEQUENCE fooseq') + t.create(checkfirst=True) + finally: + t.drop(checkfirst=True) + + def test_schema_roundtrips(self): + meta = MetaData(testing.db) + users = Table('users', meta, Column('id', Integer, + primary_key=True), Column('name', String(50)), + schema='test_schema') + users.create() + try: + users.insert().execute(id=1, name='name1') + users.insert().execute(id=2, name='name2') + users.insert().execute(id=3, name='name3') + users.insert().execute(id=4, name='name4') + eq_(users.select().where(users.c.name == 'name2' + ).execute().fetchall(), [(2, 'name2')]) + eq_(users.select(use_labels=True).where(users.c.name + == 'name2').execute().fetchall(), [(2, 'name2')]) + users.delete().where(users.c.id == 3).execute() + eq_(users.select().where(users.c.name == 'name3' + ).execute().fetchall(), []) + users.update().where(users.c.name == 'name4' + ).execute(name='newname') + eq_(users.select(use_labels=True).where(users.c.id + == 4).execute().fetchall(), [(4, 'newname')]) + finally: + users.drop() + + def test_preexecute_passivedefault(self): + """test that when we get a primary key column back from + reflecting a table which has a default value on it, we pre- + execute that DefaultClause upon insert.""" + + try: + meta = MetaData(testing.db) + testing.db.execute(""" + CREATE TABLE speedy_users + ( + speedy_user_id SERIAL PRIMARY KEY, + + user_name VARCHAR NOT NULL, + user_password VARCHAR NOT NULL + ); + """) + t = Table('speedy_users', meta, autoload=True) + r = t.insert().execute(user_name='user', + user_password='lala') + assert r.inserted_primary_key == [1] + l = t.select().execute().fetchall() + assert l == [(1, 'user', 'lala')] + finally: + testing.db.execute('drop table speedy_users') + + + @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion') + @testing.fails_on('pypostgresql', + 'psycopg2/pg8000 specific assertion') + def test_numeric_raise(self): + stmt = text("select cast('hi' as char) as hi", typemap={'hi' + : Numeric}) + assert_raises(exc.InvalidRequestError, testing.db.execute, stmt) + + def test_serial_integer(self): + for type_, expected in [ + (Integer, 'SERIAL'), + (BigInteger, 'BIGSERIAL'), + (SmallInteger, 'SMALLINT'), + (postgresql.INTEGER, 'SERIAL'), + (postgresql.BIGINT, 'BIGSERIAL'), + ]: + m = MetaData() + + t = Table('t', m, Column('c', type_, primary_key=True)) + ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t)) + eq_( + ddl_compiler.get_column_specification(t.c.c), + "c %s NOT NULL" % expected + ) diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py new file mode 100644 index 000000000..a7bcbf3da --- /dev/null +++ b/test/dialect/postgresql/test_query.py @@ -0,0 +1,723 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import eq_, assert_raises, \ + assert_raises_message, is_, AssertsExecutionResults, \ + AssertsCompiledSQL, ComparesTables +from sqlalchemy.testing import engines, fixtures +from sqlalchemy import testing +from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ + String, Sequence, ForeignKey, join, Numeric, \ + PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ + func, literal_column, literal, bindparam, cast, extract, \ + SmallInteger, Enum, REAL, update, insert, Index, delete, \ + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy import exc +import logging + +class InsertTest(fixtures.TestBase, AssertsExecutionResults): + + __only_on__ = 'postgresql' + + @classmethod + def setup_class(cls): + global metadata + cls.engine = testing.db + metadata = MetaData(testing.db) + + def teardown(self): + metadata.drop_all() + metadata.clear() + if self.engine is not testing.db: + self.engine.dispose() + + def test_compiled_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + primary_key=True), Column('data', String(30))) + metadata.create_all() + ins = table.insert(inline=True, values={'data': bindparam('x' + )}).compile() + ins.execute({'x': 'five'}, {'x': 'seven'}) + assert table.select().execute().fetchall() == [(1, 'five'), (2, + 'seven')] + + def test_foreignkey_missing_insert(self): + t1 = Table('t1', metadata, Column('id', Integer, + primary_key=True)) + t2 = Table('t2', metadata, Column('id', Integer, + ForeignKey('t1.id'), primary_key=True)) + metadata.create_all() + + # want to ensure that "null value in column "id" violates not- + # null constraint" is raised (IntegrityError on psycoopg2, but + # ProgrammingError on pg8000), and not "ProgrammingError: + # (ProgrammingError) relationship "t2_id_seq" does not exist". + # the latter corresponds to autoincrement behavior, which is not + # the case here due to the foreign key. + + for eng in [engines.testing_engine(options={'implicit_returning' + : False}), + engines.testing_engine(options={'implicit_returning' + : True})]: + assert_raises_message(exc.DBAPIError, + 'violates not-null constraint', + eng.execute, t2.insert()) + + def test_sequence_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + Sequence('my_seq'), primary_key=True), + Column('data', String(30))) + metadata.create_all() + self._assert_data_with_sequence(table, 'my_seq') + + def test_sequence_returning_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + Sequence('my_seq'), primary_key=True), + Column('data', String(30))) + metadata.create_all() + self._assert_data_with_sequence_returning(table, 'my_seq') + + def test_opt_sequence_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + Sequence('my_seq', optional=True), + primary_key=True), Column('data', String(30))) + metadata.create_all() + self._assert_data_autoincrement(table) + + def test_opt_sequence_returning_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + Sequence('my_seq', optional=True), + primary_key=True), Column('data', String(30))) + metadata.create_all() + self._assert_data_autoincrement_returning(table) + + def test_autoincrement_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + primary_key=True), Column('data', String(30))) + metadata.create_all() + self._assert_data_autoincrement(table) + + def test_autoincrement_returning_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + primary_key=True), Column('data', String(30))) + metadata.create_all() + self._assert_data_autoincrement_returning(table) + + def test_noautoincrement_insert(self): + table = Table('testtable', metadata, Column('id', Integer, + primary_key=True, autoincrement=False), + Column('data', String(30))) + metadata.create_all() + self._assert_data_noautoincrement(table) + + def _assert_data_autoincrement(self, table): + self.engine = \ + engines.testing_engine(options={'implicit_returning' + : False}) + metadata.bind = self.engine + + def go(): + + # execute with explicit id + + r = table.insert().execute({'id': 30, 'data': 'd1'}) + assert r.inserted_primary_key == [30] + + # execute with prefetch id + + r = table.insert().execute({'data': 'd2'}) + assert r.inserted_primary_key == [1] + + # executemany with explicit ids + + table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, + 'data': 'd4'}) + + # executemany, uses SERIAL + + table.insert().execute({'data': 'd5'}, {'data': 'd6'}) + + # single execute, explicit id, inline + + table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) + + # single execute, inline, uses SERIAL + + table.insert(inline=True).execute({'data': 'd8'}) + + # note that the test framework doesnt capture the "preexecute" + # of a seqeuence or default. we just see it in the bind params. + + self.assert_sql(self.engine, go, [], with_sequences=[ + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 1, 'data': 'd2'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd8'}]), + ]) + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + table.delete().execute() + + # test the same series of events using a reflected version of + # the table + + m2 = MetaData(self.engine) + table = Table(table.name, m2, autoload=True) + + def go(): + table.insert().execute({'id': 30, 'data': 'd1'}) + r = table.insert().execute({'data': 'd2'}) + assert r.inserted_primary_key == [5] + table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, + 'data': 'd4'}) + table.insert().execute({'data': 'd5'}, {'data': 'd6'}) + table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) + table.insert(inline=True).execute({'data': 'd8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 5, 'data': 'd2'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd8'}]), + ]) + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (5, 'd2'), + (31, 'd3'), + (32, 'd4'), + (6, 'd5'), + (7, 'd6'), + (33, 'd7'), + (8, 'd8'), + ] + table.delete().execute() + + def _assert_data_autoincrement_returning(self, table): + self.engine = \ + engines.testing_engine(options={'implicit_returning': True}) + metadata.bind = self.engine + + def go(): + + # execute with explicit id + + r = table.insert().execute({'id': 30, 'data': 'd1'}) + assert r.inserted_primary_key == [30] + + # execute with prefetch id + + r = table.insert().execute({'data': 'd2'}) + assert r.inserted_primary_key == [1] + + # executemany with explicit ids + + table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, + 'data': 'd4'}) + + # executemany, uses SERIAL + + table.insert().execute({'data': 'd5'}, {'data': 'd6'}) + + # single execute, explicit id, inline + + table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) + + # single execute, inline, uses SERIAL + + table.insert(inline=True).execute({'data': 'd8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + ('INSERT INTO testtable (data) VALUES (:data) RETURNING ' + 'testtable.id', {'data': 'd2'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd8'}]), + ]) + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + table.delete().execute() + + # test the same series of events using a reflected version of + # the table + + m2 = MetaData(self.engine) + table = Table(table.name, m2, autoload=True) + + def go(): + table.insert().execute({'id': 30, 'data': 'd1'}) + r = table.insert().execute({'data': 'd2'}) + assert r.inserted_primary_key == [5] + table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, + 'data': 'd4'}) + table.insert().execute({'data': 'd5'}, {'data': 'd6'}) + table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) + table.insert(inline=True).execute({'data': 'd8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + ('INSERT INTO testtable (data) VALUES (:data) RETURNING ' + 'testtable.id', {'data': 'd2'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + ('INSERT INTO testtable (data) VALUES (:data)', [{'data' + : 'd8'}]), + ]) + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (5, 'd2'), + (31, 'd3'), + (32, 'd4'), + (6, 'd5'), + (7, 'd6'), + (33, 'd7'), + (8, 'd8'), + ] + table.delete().execute() + + def _assert_data_with_sequence(self, table, seqname): + self.engine = \ + engines.testing_engine(options={'implicit_returning' + : False}) + metadata.bind = self.engine + + def go(): + table.insert().execute({'id': 30, 'data': 'd1'}) + table.insert().execute({'data': 'd2'}) + table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, + 'data': 'd4'}) + table.insert().execute({'data': 'd5'}, {'data': 'd6'}) + table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) + table.insert(inline=True).execute({'data': 'd8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 1, 'data': 'd2'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + ":data)" % seqname, [{'data': 'd8'}]), + ]) + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + + # cant test reflection here since the Sequence must be + # explicitly specified + + def _assert_data_with_sequence_returning(self, table, seqname): + self.engine = \ + engines.testing_engine(options={'implicit_returning': True}) + metadata.bind = self.engine + + def go(): + table.insert().execute({'id': 30, 'data': 'd1'}) + table.insert().execute({'data': 'd2'}) + table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, + 'data': 'd4'}) + table.insert().execute({'data': 'd5'}, {'data': 'd6'}) + table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) + table.insert(inline=True).execute({'data': 'd8'}) + + self.assert_sql(self.engine, go, [], with_sequences=[ + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + {'id': 30, 'data': 'd1'}), + ("INSERT INTO testtable (id, data) VALUES " + "(nextval('my_seq'), :data) RETURNING testtable.id", + {'data': 'd2'}), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), + ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]), + ('INSERT INTO testtable (id, data) VALUES (:id, :data)', + [{'id': 33, 'data': 'd7'}]), + ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " + ":data)" % seqname, [{'data': 'd8'}]), + ]) + assert table.select().execute().fetchall() == [ + (30, 'd1'), + (1, 'd2'), + (31, 'd3'), + (32, 'd4'), + (2, 'd5'), + (3, 'd6'), + (33, 'd7'), + (4, 'd8'), + ] + + # cant test reflection here since the Sequence must be + # explicitly specified + + def _assert_data_noautoincrement(self, table): + self.engine = \ + engines.testing_engine(options={'implicit_returning' + : False}) + metadata.bind = self.engine + table.insert().execute({'id': 30, 'data': 'd1'}) + if self.engine.driver == 'pg8000': + exception_cls = exc.ProgrammingError + elif self.engine.driver == 'pypostgresql': + exception_cls = Exception + else: + exception_cls = exc.IntegrityError + assert_raises_message(exception_cls, + 'violates not-null constraint', + table.insert().execute, {'data': 'd2'}) + assert_raises_message(exception_cls, + 'violates not-null constraint', + table.insert().execute, {'data': 'd2'}, + {'data': 'd3'}) + assert_raises_message(exception_cls, + 'violates not-null constraint', + table.insert().execute, {'data': 'd2'}) + assert_raises_message(exception_cls, + 'violates not-null constraint', + table.insert().execute, {'data': 'd2'}, + {'data': 'd3'}) + table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, + 'data': 'd3'}) + table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) + assert table.select().execute().fetchall() == [(30, 'd1'), (31, + 'd2'), (32, 'd3'), (33, 'd4')] + table.delete().execute() + + # test the same series of events using a reflected version of + # the table + + m2 = MetaData(self.engine) + table = Table(table.name, m2, autoload=True) + table.insert().execute({'id': 30, 'data': 'd1'}) + assert_raises_message(exception_cls, + 'violates not-null constraint', + table.insert().execute, {'data': 'd2'}) + assert_raises_message(exception_cls, + 'violates not-null constraint', + table.insert().execute, {'data': 'd2'}, + {'data': 'd3'}) + table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, + 'data': 'd3'}) + table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) + assert table.select().execute().fetchall() == [(30, 'd1'), (31, + 'd2'), (32, 'd3'), (33, 'd4')] + + +class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): + + __only_on__ = 'postgresql+psycopg2' + + def _fixture(self, server_side_cursors): + self.engine = engines.testing_engine( + options={'server_side_cursors':server_side_cursors} + ) + return self.engine + + def tearDown(self): + engines.testing_reaper.close_all() + self.engine.dispose() + + def test_global_string(self): + engine = self._fixture(True) + result = engine.execute('select 1') + assert result.cursor.name + + def test_global_text(self): + engine = self._fixture(True) + result = engine.execute(text('select 1')) + assert result.cursor.name + + def test_global_expr(self): + engine = self._fixture(True) + result = engine.execute(select([1])) + assert result.cursor.name + + def test_global_off_explicit(self): + engine = self._fixture(False) + result = engine.execute(text('select 1')) + + # It should be off globally ... + + assert not result.cursor.name + + def test_stmt_option(self): + engine = self._fixture(False) + + s = select([1]).execution_options(stream_results=True) + result = engine.execute(s) + + # ... but enabled for this one. + + assert result.cursor.name + + + def test_conn_option(self): + engine = self._fixture(False) + + # and this one + result = \ + engine.connect().execution_options(stream_results=True).\ + execute('select 1' + ) + assert result.cursor.name + + def test_stmt_enabled_conn_option_disabled(self): + engine = self._fixture(False) + + s = select([1]).execution_options(stream_results=True) + + # not this one + result = \ + engine.connect().execution_options(stream_results=False).\ + execute(s) + assert not result.cursor.name + + def test_stmt_option_disabled(self): + engine = self._fixture(True) + s = select([1]).execution_options(stream_results=False) + result = engine.execute(s) + assert not result.cursor.name + + def test_aliases_and_ss(self): + engine = self._fixture(False) + s1 = select([1]).execution_options(stream_results=True).alias() + result = engine.execute(s1) + assert result.cursor.name + + # s1's options shouldn't affect s2 when s2 is used as a + # from_obj. + s2 = select([1], from_obj=s1) + result = engine.execute(s2) + assert not result.cursor.name + + def test_for_update_expr(self): + engine = self._fixture(True) + s1 = select([1], for_update=True) + result = engine.execute(s1) + assert result.cursor.name + + def test_for_update_string(self): + engine = self._fixture(True) + result = engine.execute('SELECT 1 FOR UPDATE') + assert result.cursor.name + + def test_text_no_ss(self): + engine = self._fixture(False) + s = text('select 42') + result = engine.execute(s) + assert not result.cursor.name + + def test_text_ss_option(self): + engine = self._fixture(False) + s = text('select 42').execution_options(stream_results=True) + result = engine.execute(s) + assert result.cursor.name + + def test_roundtrip(self): + engine = self._fixture(True) + test_table = Table('test_table', MetaData(engine), + Column('id', Integer, primary_key=True), + Column('data', String(50))) + test_table.create(checkfirst=True) + try: + test_table.insert().execute(data='data1') + nextid = engine.execute(Sequence('test_table_id_seq')) + test_table.insert().execute(id=nextid, data='data2') + eq_(test_table.select().execute().fetchall(), [(1, 'data1' + ), (2, 'data2')]) + test_table.update().where(test_table.c.id + == 2).values(data=test_table.c.data + ' updated' + ).execute() + eq_(test_table.select().execute().fetchall(), [(1, 'data1' + ), (2, 'data2 updated')]) + test_table.delete().execute() + eq_(test_table.count().scalar(), 0) + finally: + test_table.drop(checkfirst=True) + + +class MatchTest(fixtures.TestBase, AssertsCompiledSQL): + + __only_on__ = 'postgresql' + __excluded_on__ = ('postgresql', '<', (8, 3, 0)), + + @classmethod + def setup_class(cls): + global metadata, cattable, matchtable + metadata = MetaData(testing.db) + cattable = Table('cattable', metadata, Column('id', Integer, + primary_key=True), Column('description', + String(50))) + matchtable = Table('matchtable', metadata, Column('id', + Integer, primary_key=True), Column('title', + String(200)), Column('category_id', Integer, + ForeignKey('cattable.id'))) + metadata.create_all() + cattable.insert().execute([{'id': 1, 'description': 'Python'}, + {'id': 2, 'description': 'Ruby'}]) + matchtable.insert().execute([{'id': 1, 'title' + : 'Agile Web Development with Rails' + , 'category_id': 2}, + {'id': 2, + 'title': 'Dive Into Python', + 'category_id': 1}, + {'id': 3, 'title' + : "Programming Matz's Ruby", + 'category_id': 2}, + {'id': 4, 'title' + : 'The Definitive Guide to Django', + 'category_id': 1}, + {'id': 5, 'title' + : 'Python in a Nutshell', + 'category_id': 1}]) + + @classmethod + def teardown_class(cls): + metadata.drop_all() + + @testing.fails_on('postgresql+pg8000', 'uses positional') + @testing.fails_on('postgresql+zxjdbc', 'uses qmark') + def test_expression_pyformat(self): + self.assert_compile(matchtable.c.title.match('somstr'), + 'matchtable.title @@ to_tsquery(%(title_1)s' + ')') + + @testing.fails_on('postgresql+psycopg2', 'uses pyformat') + @testing.fails_on('postgresql+pypostgresql', 'uses pyformat') + @testing.fails_on('postgresql+zxjdbc', 'uses qmark') + def test_expression_positional(self): + self.assert_compile(matchtable.c.title.match('somstr'), + 'matchtable.title @@ to_tsquery(%s)') + + def test_simple_match(self): + results = \ + matchtable.select().where(matchtable.c.title.match('python' + )).order_by(matchtable.c.id).execute().fetchall() + eq_([2, 5], [r.id for r in results]) + + def test_simple_match_with_apostrophe(self): + results = \ + matchtable.select().where(matchtable.c.title.match("Matz's" + )).execute().fetchall() + eq_([3], [r.id for r in results]) + + def test_simple_derivative_match(self): + results = \ + matchtable.select().where(matchtable.c.title.match('nutshells' + )).execute().fetchall() + eq_([5], [r.id for r in results]) + + def test_or_match(self): + results1 = \ + matchtable.select().where(or_(matchtable.c.title.match('nutshells' + ), matchtable.c.title.match('rubies' + ))).order_by(matchtable.c.id).execute().fetchall() + eq_([3, 5], [r.id for r in results1]) + results2 = \ + matchtable.select().where( + matchtable.c.title.match('nutshells | rubies' + )).order_by(matchtable.c.id).execute().fetchall() + eq_([3, 5], [r.id for r in results2]) + + def test_and_match(self): + results1 = \ + matchtable.select().where(and_(matchtable.c.title.match('python' + ), matchtable.c.title.match('nutshells' + ))).execute().fetchall() + eq_([5], [r.id for r in results1]) + results2 = \ + matchtable.select().where( + matchtable.c.title.match('python & nutshells' + )).execute().fetchall() + eq_([5], [r.id for r in results2]) + + def test_match_across_joins(self): + results = matchtable.select().where(and_(cattable.c.id + == matchtable.c.category_id, + or_(cattable.c.description.match('Ruby'), + matchtable.c.title.match('nutshells' + )))).order_by(matchtable.c.id).execute().fetchall() + eq_([1, 3, 5], [r.id for r in results]) + + +class TupleTest(fixtures.TestBase): + __only_on__ = 'postgresql' + + def test_tuple_containment(self): + + for test, exp in [ + ([('a', 'b')], True), + ([('a', 'c')], False), + ([('f', 'q'), ('a', 'b')], True), + ([('f', 'q'), ('a', 'c')], False) + ]: + eq_( + testing.db.execute( + select([ + tuple_( + literal_column("'a'"), + literal_column("'b'") + ).\ + in_([ + tuple_(*[ + literal_column("'%s'" % letter) + for letter in elem + ]) for elem in test + ]) + ]) + ).scalar(), + exp + ) diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py new file mode 100644 index 000000000..fb399b546 --- /dev/null +++ b/test/dialect/postgresql/test_reflection.py @@ -0,0 +1,460 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import eq_, assert_raises, \ + assert_raises_message, is_, AssertsExecutionResults, \ + AssertsCompiledSQL, ComparesTables +from sqlalchemy.testing import engines, fixtures +from sqlalchemy import testing +from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ + String, Sequence, ForeignKey, join, Numeric, \ + PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ + func, literal_column, literal, bindparam, cast, extract, \ + SmallInteger, Enum, REAL, update, insert, Index, delete, \ + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy import exc +from sqlalchemy.dialects.postgresql import base as postgresql +import logging + +class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): + + """Test PostgreSQL domains""" + + __only_on__ = 'postgresql' + + @classmethod + def setup_class(cls): + con = testing.db.connect() + for ddl in \ + 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \ + 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \ + "CREATE TYPE testtype AS ENUM ('test')", \ + 'CREATE DOMAIN enumdomain AS testtype'\ + : + try: + con.execute(ddl) + except exc.DBAPIError as e: + if not 'already exists' in str(e): + raise e + con.execute('CREATE TABLE testtable (question integer, answer ' + 'testdomain)') + con.execute('CREATE TABLE test_schema.testtable(question ' + 'integer, answer test_schema.testdomain, anything ' + 'integer)') + con.execute('CREATE TABLE crosschema (question integer, answer ' + 'test_schema.testdomain)') + + con.execute('CREATE TABLE enum_test (id integer, data enumdomain)') + + @classmethod + def teardown_class(cls): + con = testing.db.connect() + con.execute('DROP TABLE testtable') + con.execute('DROP TABLE test_schema.testtable') + con.execute('DROP TABLE crosschema') + con.execute('DROP DOMAIN testdomain') + con.execute('DROP DOMAIN test_schema.testdomain') + con.execute("DROP TABLE enum_test") + con.execute("DROP DOMAIN enumdomain") + con.execute("DROP TYPE testtype") + + def test_table_is_reflected(self): + metadata = MetaData(testing.db) + table = Table('testtable', metadata, autoload=True) + eq_(set(table.columns.keys()), set(['question', 'answer']), + "Columns of reflected table didn't equal expected columns") + assert isinstance(table.c.answer.type, Integer) + + def test_domain_is_reflected(self): + metadata = MetaData(testing.db) + table = Table('testtable', metadata, autoload=True) + eq_(str(table.columns.answer.server_default.arg), '42', + "Reflected default value didn't equal expected value") + assert not table.columns.answer.nullable, \ + 'Expected reflected column to not be nullable.' + + def test_enum_domain_is_reflected(self): + metadata = MetaData(testing.db) + table = Table('enum_test', metadata, autoload=True) + eq_( + table.c.data.type.enums, + ('test', ) + ) + + def test_table_is_reflected_test_schema(self): + metadata = MetaData(testing.db) + table = Table('testtable', metadata, autoload=True, + schema='test_schema') + eq_(set(table.columns.keys()), set(['question', 'answer', + 'anything']), + "Columns of reflected table didn't equal expected columns") + assert isinstance(table.c.anything.type, Integer) + + def test_schema_domain_is_reflected(self): + metadata = MetaData(testing.db) + table = Table('testtable', metadata, autoload=True, + schema='test_schema') + eq_(str(table.columns.answer.server_default.arg), '0', + "Reflected default value didn't equal expected value") + assert table.columns.answer.nullable, \ + 'Expected reflected column to be nullable.' + + def test_crosschema_domain_is_reflected(self): + metadata = MetaData(testing.db) + table = Table('crosschema', metadata, autoload=True) + eq_(str(table.columns.answer.server_default.arg), '0', + "Reflected default value didn't equal expected value") + assert table.columns.answer.nullable, \ + 'Expected reflected column to be nullable.' + + def test_unknown_types(self): + from sqlalchemy.databases import postgresql + ischema_names = postgresql.PGDialect.ischema_names + postgresql.PGDialect.ischema_names = {} + try: + m2 = MetaData(testing.db) + assert_raises(exc.SAWarning, Table, 'testtable', m2, + autoload=True) + + @testing.emits_warning('Did not recognize type') + def warns(): + m3 = MetaData(testing.db) + t3 = Table('testtable', m3, autoload=True) + assert t3.c.answer.type.__class__ == sa.types.NullType + finally: + postgresql.PGDialect.ischema_names = ischema_names + + +class ReflectionTest(fixtures.TestBase): + __only_on__ = 'postgresql' + + @testing.fails_if(('postgresql', '<', (8, 4)), + "newer query is bypassed due to unsupported SQL functions") + @testing.provide_metadata + def test_reflected_primary_key_order(self): + meta1 = self.metadata + subject = Table('subject', meta1, + Column('p1', Integer, primary_key=True), + Column('p2', Integer, primary_key=True), + PrimaryKeyConstraint('p2', 'p1') + ) + meta1.create_all() + meta2 = MetaData(testing.db) + subject = Table('subject', meta2, autoload=True) + eq_(subject.primary_key.columns.keys(), ['p2', 'p1']) + + @testing.provide_metadata + def test_pg_weirdchar_reflection(self): + meta1 = self.metadata + subject = Table('subject', meta1, Column('id$', Integer, + primary_key=True)) + referer = Table('referer', meta1, Column('id', Integer, + primary_key=True), Column('ref', Integer, + ForeignKey('subject.id$'))) + meta1.create_all() + meta2 = MetaData(testing.db) + subject = Table('subject', meta2, autoload=True) + referer = Table('referer', meta2, autoload=True) + self.assert_((subject.c['id$'] + == referer.c.ref).compare( + subject.join(referer).onclause)) + + @testing.provide_metadata + def test_renamed_sequence_reflection(self): + metadata = self.metadata + t = Table('t', metadata, Column('id', Integer, primary_key=True)) + metadata.create_all() + m2 = MetaData(testing.db) + t2 = Table('t', m2, autoload=True, implicit_returning=False) + eq_(t2.c.id.server_default.arg.text, + "nextval('t_id_seq'::regclass)") + r = t2.insert().execute() + eq_(r.inserted_primary_key, [1]) + testing.db.connect().execution_options(autocommit=True).\ + execute('alter table t_id_seq rename to foobar_id_seq' + ) + m3 = MetaData(testing.db) + t3 = Table('t', m3, autoload=True, implicit_returning=False) + eq_(t3.c.id.server_default.arg.text, + "nextval('foobar_id_seq'::regclass)") + r = t3.insert().execute() + eq_(r.inserted_primary_key, [2]) + + @testing.provide_metadata + def test_renamed_pk_reflection(self): + metadata = self.metadata + t = Table('t', metadata, Column('id', Integer, primary_key=True)) + metadata.create_all() + testing.db.connect().execution_options(autocommit=True).\ + execute('alter table t rename id to t_id') + m2 = MetaData(testing.db) + t2 = Table('t', m2, autoload=True) + eq_([c.name for c in t2.primary_key], ['t_id']) + + @testing.provide_metadata + def test_schema_reflection(self): + """note: this test requires that the 'test_schema' schema be + separate and accessible by the test user""" + + meta1 = self.metadata + + users = Table('users', meta1, Column('user_id', Integer, + primary_key=True), Column('user_name', + String(30), nullable=False), schema='test_schema') + addresses = Table( + 'email_addresses', + meta1, + Column('address_id', Integer, primary_key=True), + Column('remote_user_id', Integer, + ForeignKey(users.c.user_id)), + Column('email_address', String(20)), + schema='test_schema', + ) + meta1.create_all() + meta2 = MetaData(testing.db) + addresses = Table('email_addresses', meta2, autoload=True, + schema='test_schema') + users = Table('users', meta2, mustexist=True, + schema='test_schema') + j = join(users, addresses) + self.assert_((users.c.user_id + == addresses.c.remote_user_id).compare(j.onclause)) + + @testing.provide_metadata + def test_schema_reflection_2(self): + meta1 = self.metadata + subject = Table('subject', meta1, Column('id', Integer, + primary_key=True)) + referer = Table('referer', meta1, Column('id', Integer, + primary_key=True), Column('ref', Integer, + ForeignKey('subject.id')), schema='test_schema') + meta1.create_all() + meta2 = MetaData(testing.db) + subject = Table('subject', meta2, autoload=True) + referer = Table('referer', meta2, schema='test_schema', + autoload=True) + self.assert_((subject.c.id + == referer.c.ref).compare( + subject.join(referer).onclause)) + + @testing.provide_metadata + def test_schema_reflection_3(self): + meta1 = self.metadata + subject = Table('subject', meta1, Column('id', Integer, + primary_key=True), schema='test_schema_2') + referer = Table('referer', meta1, Column('id', Integer, + primary_key=True), Column('ref', Integer, + ForeignKey('test_schema_2.subject.id')), + schema='test_schema') + meta1.create_all() + meta2 = MetaData(testing.db) + subject = Table('subject', meta2, autoload=True, + schema='test_schema_2') + referer = Table('referer', meta2, schema='test_schema', + autoload=True) + self.assert_((subject.c.id + == referer.c.ref).compare( + subject.join(referer).onclause)) + + @testing.provide_metadata + def test_uppercase_lowercase_table(self): + metadata = self.metadata + + a_table = Table('a', metadata, Column('x', Integer)) + A_table = Table('A', metadata, Column('x', Integer)) + + a_table.create() + assert testing.db.has_table("a") + assert not testing.db.has_table("A") + A_table.create(checkfirst=True) + assert testing.db.has_table("A") + + def test_uppercase_lowercase_sequence(self): + + a_seq = Sequence('a') + A_seq = Sequence('A') + + a_seq.create(testing.db) + assert testing.db.dialect.has_sequence(testing.db, "a") + assert not testing.db.dialect.has_sequence(testing.db, "A") + A_seq.create(testing.db, checkfirst=True) + assert testing.db.dialect.has_sequence(testing.db, "A") + + a_seq.drop(testing.db) + A_seq.drop(testing.db) + + def test_schema_reflection_multi_search_path(self): + """test the 'set the same schema' rule when + multiple schemas/search paths are in effect.""" + + db = engines.testing_engine() + conn = db.connect() + trans = conn.begin() + try: + conn.execute("set search_path to test_schema_2, " + "test_schema, public") + conn.dialect.default_schema_name = "test_schema_2" + + conn.execute(""" + CREATE TABLE test_schema.some_table ( + id SERIAL not null primary key + ) + """) + + conn.execute(""" + CREATE TABLE test_schema_2.some_other_table ( + id SERIAL not null primary key, + sid INTEGER REFERENCES test_schema.some_table(id) + ) + """) + + m1 = MetaData() + + t2_schema = Table('some_other_table', + m1, + schema="test_schema_2", + autoload=True, + autoload_with=conn) + t1_schema = Table('some_table', + m1, + schema="test_schema", + autoload=True, + autoload_with=conn) + + t2_no_schema = Table('some_other_table', + m1, + autoload=True, + autoload_with=conn) + + t1_no_schema = Table('some_table', + m1, + autoload=True, + autoload_with=conn) + + # OK, this because, "test_schema" is + # in the search path, and might as well be + # the default too. why would we assign + # a "schema" to the Table ? + assert t2_schema.c.sid.references( + t1_no_schema.c.id) + + assert t2_no_schema.c.sid.references( + t1_no_schema.c.id) + + finally: + trans.rollback() + conn.close() + db.dispose() + + @testing.provide_metadata + def test_index_reflection(self): + """ Reflecting partial & expression-based indexes should warn + """ + + metadata = self.metadata + + t1 = Table('party', metadata, Column('id', String(10), + nullable=False), Column('name', String(20), + index=True), Column('aname', String(20))) + metadata.create_all() + testing.db.execute(""" + create index idx1 on party ((id || name)) + """) + testing.db.execute(""" + create unique index idx2 on party (id) where name = 'test' + """) + testing.db.execute(""" + create index idx3 on party using btree + (lower(name::text), lower(aname::text)) + """) + + def go(): + m2 = MetaData(testing.db) + t2 = Table('party', m2, autoload=True) + assert len(t2.indexes) == 2 + + # Make sure indexes are in the order we expect them in + + tmp = [(idx.name, idx) for idx in t2.indexes] + tmp.sort() + r1, r2 = [idx[1] for idx in tmp] + assert r1.name == 'idx2' + assert r1.unique == True + assert r2.unique == False + assert [t2.c.id] == r1.columns + assert [t2.c.name] == r2.columns + + testing.assert_warnings(go, + [ + 'Skipped unsupported reflection of ' + 'expression-based index idx1', + 'Predicate of partial index idx2 ignored during ' + 'reflection', + 'Skipped unsupported reflection of ' + 'expression-based index idx3' + ]) + + @testing.provide_metadata + def test_index_reflection_modified(self): + """reflect indexes when a column name has changed - PG 9 + does not update the name of the column in the index def. + [ticket:2141] + + """ + + metadata = self.metadata + + t1 = Table('t', metadata, + Column('id', Integer, primary_key=True), + Column('x', Integer) + ) + metadata.create_all() + conn = testing.db.connect().execution_options(autocommit=True) + conn.execute("CREATE INDEX idx1 ON t (x)") + conn.execute("ALTER TABLE t RENAME COLUMN x to y") + + ind = testing.db.dialect.get_indexes(conn, "t", None) + eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}]) + conn.close() + +class CustomTypeReflectionTest(fixtures.TestBase): + + class CustomType(object): + def __init__(self, arg1=None, arg2=None): + self.arg1 = arg1 + self.arg2 = arg2 + + ischema_names = None + + def setup(self): + ischema_names = postgresql.PGDialect.ischema_names + postgresql.PGDialect.ischema_names = ischema_names.copy() + self.ischema_names = ischema_names + + def teardown(self): + postgresql.PGDialect.ischema_names = self.ischema_names + self.ischema_names = None + + def _assert_reflected(self, dialect): + for sch, args in [ + ('my_custom_type', (None, None)), + ('my_custom_type()', (None, None)), + ('my_custom_type(ARG1)', ('ARG1', None)), + ('my_custom_type(ARG1, ARG2)', ('ARG1', 'ARG2')), + ]: + column_info = dialect._get_column_info( + 'colname', sch, None, False, + {}, {}, 'public') + assert isinstance(column_info['type'], self.CustomType) + eq_(column_info['type'].arg1, args[0]) + eq_(column_info['type'].arg2, args[1]) + + def test_clslevel(self): + postgresql.PGDialect.ischema_names['my_custom_type'] = self.CustomType + dialect = postgresql.PGDialect() + self._assert_reflected(dialect) + + def test_instancelevel(self): + dialect = postgresql.PGDialect() + dialect.ischema_names = dialect.ischema_names.copy() + dialect.ischema_names['my_custom_type'] = self.CustomType + self._assert_reflected(dialect) diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py new file mode 100644 index 000000000..784f8bcbf --- /dev/null +++ b/test/dialect/postgresql/test_types.py @@ -0,0 +1,1679 @@ +# coding: utf-8 +from sqlalchemy.testing.assertions import eq_, assert_raises, \ + assert_raises_message, is_, AssertsExecutionResults, \ + AssertsCompiledSQL, ComparesTables +from sqlalchemy.testing import engines, fixtures +from sqlalchemy import testing +import datetime +from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ + String, Sequence, ForeignKey, join, Numeric, \ + PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ + func, literal_column, literal, bindparam, cast, extract, \ + SmallInteger, Enum, REAL, update, insert, Index, delete, \ + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy.orm import Session, mapper, aliased +from sqlalchemy import exc, schema, types +from sqlalchemy.dialects.postgresql import base as postgresql +from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \ + INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE +import decimal +from sqlalchemy import util +from sqlalchemy.testing.util import round_decimal +from sqlalchemy.sql import table, column, operators +import logging +import re + +class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): + __only_on__ = 'postgresql' + __dialect__ = postgresql.dialect() + + @classmethod + def define_tables(cls, metadata): + data_table = Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', Integer) + ) + + @classmethod + def insert_data(cls): + data_table = cls.tables.data_table + + data_table.insert().execute( + {'data':3}, + {'data':5}, + {'data':7}, + {'data':2}, + {'data':15}, + {'data':12}, + {'data':6}, + {'data':478}, + {'data':52}, + {'data':9}, + ) + + @testing.fails_on('postgresql+zxjdbc', + 'XXX: postgresql+zxjdbc currently returns a Decimal result for Float') + def test_float_coercion(self): + data_table = self.tables.data_table + + for type_, result in [ + (Numeric, decimal.Decimal('140.381230939')), + (Float, 140.381230939), + (Float(asdecimal=True), decimal.Decimal('140.381230939')), + (Numeric(asdecimal=False), 140.381230939), + ]: + ret = testing.db.execute( + select([ + func.stddev_pop(data_table.c.data, type_=type_) + ]) + ).scalar() + + eq_(round_decimal(ret, 9), result) + + ret = testing.db.execute( + select([ + cast(func.stddev_pop(data_table.c.data), type_) + ]) + ).scalar() + eq_(round_decimal(ret, 9), result) + + @testing.fails_on('postgresql+zxjdbc', + 'zxjdbc has no support for PG arrays') + @testing.provide_metadata + def test_arrays(self): + metadata = self.metadata + t1 = Table('t', metadata, + Column('x', postgresql.ARRAY(Float)), + Column('y', postgresql.ARRAY(REAL)), + Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)), + Column('q', postgresql.ARRAY(Numeric)) + ) + metadata.create_all() + t1.insert().execute(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]) + row = t1.select().execute().first() + eq_( + row, + ([5], [5], [6], [decimal.Decimal("6.4")]) + ) + +class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): + + __only_on__ = 'postgresql' + __dialect__ = postgresql.dialect() + + def test_compile(self): + e1 = Enum('x', 'y', 'z', name='somename') + e2 = Enum('x', 'y', 'z', name='somename', schema='someschema') + self.assert_compile(postgresql.CreateEnumType(e1), + "CREATE TYPE somename AS ENUM ('x','y','z')" + ) + self.assert_compile(postgresql.CreateEnumType(e2), + "CREATE TYPE someschema.somename AS ENUM " + "('x','y','z')") + self.assert_compile(postgresql.DropEnumType(e1), + 'DROP TYPE somename') + self.assert_compile(postgresql.DropEnumType(e2), + 'DROP TYPE someschema.somename') + t1 = Table('sometable', MetaData(), Column('somecolumn', e1)) + self.assert_compile(schema.CreateTable(t1), + 'CREATE TABLE sometable (somecolumn ' + 'somename)') + t1 = Table('sometable', MetaData(), Column('somecolumn', + Enum('x', 'y', 'z', native_enum=False))) + self.assert_compile(schema.CreateTable(t1), + "CREATE TABLE sometable (somecolumn " + "VARCHAR(1), CHECK (somecolumn IN ('x', " + "'y', 'z')))") + + @testing.fails_on('postgresql+zxjdbc', + 'zxjdbc fails on ENUM: column "XXX" is of type ' + 'XXX but expression is of type character varying') + @testing.fails_on('postgresql+pg8000', + 'zxjdbc fails on ENUM: column "XXX" is of type ' + 'XXX but expression is of type text') + def test_create_table(self): + metadata = MetaData(testing.db) + t1 = Table('table', metadata, Column('id', Integer, + primary_key=True), Column('value', Enum('one', 'two' + , 'three', name='onetwothreetype'))) + t1.create() + t1.create(checkfirst=True) # check the create + try: + t1.insert().execute(value='two') + t1.insert().execute(value='three') + t1.insert().execute(value='three') + eq_(t1.select().order_by(t1.c.id).execute().fetchall(), + [(1, 'two'), (2, 'three'), (3, 'three')]) + finally: + metadata.drop_all() + metadata.drop_all() + + def test_name_required(self): + metadata = MetaData(testing.db) + etype = Enum('four', 'five', 'six', metadata=metadata) + assert_raises(exc.CompileError, etype.create) + assert_raises(exc.CompileError, etype.compile, + dialect=postgresql.dialect()) + + @testing.fails_on('postgresql+zxjdbc', + 'zxjdbc fails on ENUM: column "XXX" is of type ' + 'XXX but expression is of type character varying') + @testing.fails_on('postgresql+pg8000', + 'zxjdbc fails on ENUM: column "XXX" is of type ' + 'XXX but expression is of type text') + @testing.provide_metadata + def test_unicode_labels(self): + metadata = self.metadata + t1 = Table('table', metadata, + Column('id', Integer, primary_key=True), + Column('value', + Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'), + name='onetwothreetype')) + ) + metadata.create_all() + t1.insert().execute(value=util.u('drôle')) + t1.insert().execute(value=util.u('réveillé')) + t1.insert().execute(value=util.u('S’il')) + eq_(t1.select().order_by(t1.c.id).execute().fetchall(), + [(1, util.u('drôle')), (2, util.u('réveillé')), + (3, util.u('S’il'))] + ) + m2 = MetaData(testing.db) + t2 = Table('table', m2, autoload=True) + eq_( + t2.c.value.type.enums, + (util.u('réveillé'), util.u('drôle'), util.u('S’il')) + ) + + def test_non_native_type(self): + metadata = MetaData() + t1 = Table('foo', metadata, Column('bar', Enum('one', 'two', + 'three', name='myenum', native_enum=False))) + + def go(): + t1.create(testing.db) + + try: + self.assert_sql(testing.db, go, [], + with_sequences=[("CREATE TABLE foo (\tbar " + "VARCHAR(5), \tCONSTRAINT myenum CHECK " + "(bar IN ('one', 'two', 'three')))", {})]) + finally: + metadata.drop_all(testing.db) + + @testing.provide_metadata + def test_disable_create(self): + metadata = self.metadata + + e1 = postgresql.ENUM('one', 'two', 'three', + name="myenum", + create_type=False) + + t1 = Table('e1', metadata, + Column('c1', e1) + ) + # table can be created separately + # without conflict + e1.create(bind=testing.db) + t1.create(testing.db) + t1.drop(testing.db) + e1.drop(bind=testing.db) + + @testing.provide_metadata + def test_generate_multiple(self): + """Test that the same enum twice only generates once + for the create_all() call, without using checkfirst. + + A 'memo' collection held by the DDL runner + now handles this. + + """ + metadata = self.metadata + + e1 = Enum('one', 'two', 'three', + name="myenum") + t1 = Table('e1', metadata, + Column('c1', e1) + ) + + t2 = Table('e2', metadata, + Column('c1', e1) + ) + + metadata.create_all(checkfirst=False) + metadata.drop_all(checkfirst=False) + + def test_non_native_dialect(self): + engine = engines.testing_engine() + engine.connect() + engine.dialect.supports_native_enum = False + metadata = MetaData() + t1 = Table('foo', metadata, Column('bar', Enum('one', 'two', + 'three', name='myenum'))) + + def go(): + t1.create(engine) + + try: + self.assert_sql(engine, go, [], + with_sequences=[("CREATE TABLE foo (\tbar " + "VARCHAR(5), \tCONSTRAINT myenum CHECK " + "(bar IN ('one', 'two', 'three')))", {})]) + finally: + metadata.drop_all(engine) + + def test_standalone_enum(self): + metadata = MetaData(testing.db) + etype = Enum('four', 'five', 'six', name='fourfivesixtype', + metadata=metadata) + etype.create() + try: + assert testing.db.dialect.has_type(testing.db, + 'fourfivesixtype') + finally: + etype.drop() + assert not testing.db.dialect.has_type(testing.db, + 'fourfivesixtype') + metadata.create_all() + try: + assert testing.db.dialect.has_type(testing.db, + 'fourfivesixtype') + finally: + metadata.drop_all() + assert not testing.db.dialect.has_type(testing.db, + 'fourfivesixtype') + + def test_no_support(self): + def server_version_info(self): + return (8, 2) + + e = engines.testing_engine() + dialect = e.dialect + dialect._get_server_version_info = server_version_info + + assert dialect.supports_native_enum + e.connect() + assert not dialect.supports_native_enum + + # initialize is called again on new pool + e.dispose() + e.connect() + assert not dialect.supports_native_enum + + + def test_reflection(self): + metadata = MetaData(testing.db) + etype = Enum('four', 'five', 'six', name='fourfivesixtype', + metadata=metadata) + t1 = Table('table', metadata, Column('id', Integer, + primary_key=True), Column('value', Enum('one', 'two' + , 'three', name='onetwothreetype')), Column('value2' + , etype)) + metadata.create_all() + try: + m2 = MetaData(testing.db) + t2 = Table('table', m2, autoload=True) + assert t2.c.value.type.enums == ('one', 'two', 'three') + assert t2.c.value.type.name == 'onetwothreetype' + assert t2.c.value2.type.enums == ('four', 'five', 'six') + assert t2.c.value2.type.name == 'fourfivesixtype' + finally: + metadata.drop_all() + + def test_schema_reflection(self): + metadata = MetaData(testing.db) + etype = Enum( + 'four', + 'five', + 'six', + name='fourfivesixtype', + schema='test_schema', + metadata=metadata, + ) + t1 = Table('table', metadata, Column('id', Integer, + primary_key=True), Column('value', Enum('one', 'two' + , 'three', name='onetwothreetype', + schema='test_schema')), Column('value2', etype)) + metadata.create_all() + try: + m2 = MetaData(testing.db) + t2 = Table('table', m2, autoload=True) + assert t2.c.value.type.enums == ('one', 'two', 'three') + assert t2.c.value.type.name == 'onetwothreetype' + assert t2.c.value2.type.enums == ('four', 'five', 'six') + assert t2.c.value2.type.name == 'fourfivesixtype' + assert t2.c.value2.type.schema == 'test_schema' + finally: + metadata.drop_all() + +class NumericInterpretationTest(fixtures.TestBase): + __only_on__ = 'postgresql' + + def test_numeric_codes(self): + from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base + + for dialect in (pg8000.dialect(), psycopg2.dialect()): + + typ = Numeric().dialect_impl(dialect) + for code in base._INT_TYPES + base._FLOAT_TYPES + \ + base._DECIMAL_TYPES: + proc = typ.result_processor(dialect, code) + val = 23.7 + if proc is not None: + val = proc(val) + assert val in (23.7, decimal.Decimal("23.7")) + + @testing.provide_metadata + def test_numeric_default(self): + metadata = self.metadata + # pg8000 appears to fail when the value is 0, + # returns an int instead of decimal. + t =Table('t', metadata, + Column('id', Integer, primary_key=True), + Column('nd', Numeric(asdecimal=True), default=1), + Column('nf', Numeric(asdecimal=False), default=1), + Column('fd', Float(asdecimal=True), default=1), + Column('ff', Float(asdecimal=False), default=1), + ) + metadata.create_all() + r = t.insert().execute() + + row = t.select().execute().first() + assert isinstance(row[1], decimal.Decimal) + assert isinstance(row[2], float) + assert isinstance(row[3], decimal.Decimal) + assert isinstance(row[4], float) + eq_( + row, + (1, decimal.Decimal("1"), 1, decimal.Decimal("1"), 1) + ) + +class TimezoneTest(fixtures.TestBase): + + """Test timezone-aware datetimes. + + psycopg will return a datetime with a tzinfo attached to it, if + postgresql returns it. python then will not let you compare a + datetime with a tzinfo to a datetime that doesnt have one. this + test illustrates two ways to have datetime types with and without + timezone info. """ + + __only_on__ = 'postgresql' + + @classmethod + def setup_class(cls): + global tztable, notztable, metadata + metadata = MetaData(testing.db) + + # current_timestamp() in postgresql is assumed to return + # TIMESTAMP WITH TIMEZONE + + tztable = Table('tztable', metadata, Column('id', Integer, + primary_key=True), Column('date', + DateTime(timezone=True), + onupdate=func.current_timestamp()), + Column('name', String(20))) + notztable = Table('notztable', metadata, Column('id', Integer, + primary_key=True), Column('date', + DateTime(timezone=False), + onupdate=cast(func.current_timestamp(), + DateTime(timezone=False))), Column('name', + String(20))) + metadata.create_all() + + @classmethod + def teardown_class(cls): + metadata.drop_all() + + @testing.fails_on('postgresql+zxjdbc', + "XXX: postgresql+zxjdbc doesn't give a tzinfo back") + def test_with_timezone(self): + + # get a date with a tzinfo + + somedate = \ + testing.db.connect().scalar(func.current_timestamp().select()) + assert somedate.tzinfo + tztable.insert().execute(id=1, name='row1', date=somedate) + row = select([tztable.c.date], tztable.c.id + == 1).execute().first() + eq_(row[0], somedate) + eq_(somedate.tzinfo.utcoffset(somedate), + row[0].tzinfo.utcoffset(row[0])) + result = tztable.update(tztable.c.id + == 1).returning(tztable.c.date).\ + execute(name='newname' + ) + row = result.first() + assert row[0] >= somedate + + def test_without_timezone(self): + + # get a date without a tzinfo + + somedate = datetime.datetime( 2005, 10, 20, 11, 52, 0, ) + assert not somedate.tzinfo + notztable.insert().execute(id=1, name='row1', date=somedate) + row = select([notztable.c.date], notztable.c.id + == 1).execute().first() + eq_(row[0], somedate) + eq_(row[0].tzinfo, None) + result = notztable.update(notztable.c.id + == 1).returning(notztable.c.date).\ + execute(name='newname' + ) + row = result.first() + assert row[0] >= somedate + +class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL): + + __dialect__ = postgresql.dialect() + + def test_compile(self): + for type_, expected in [ + (postgresql.TIME(), 'TIME WITHOUT TIME ZONE'), + (postgresql.TIME(precision=5), 'TIME(5) WITHOUT TIME ZONE' + ), + (postgresql.TIME(timezone=True, precision=5), + 'TIME(5) WITH TIME ZONE'), + (postgresql.TIMESTAMP(), 'TIMESTAMP WITHOUT TIME ZONE'), + (postgresql.TIMESTAMP(precision=5), + 'TIMESTAMP(5) WITHOUT TIME ZONE'), + (postgresql.TIMESTAMP(timezone=True, precision=5), + 'TIMESTAMP(5) WITH TIME ZONE'), + ]: + self.assert_compile(type_, expected) + + @testing.only_on('postgresql', 'DB specific feature') + @testing.provide_metadata + def test_reflection(self): + metadata = self.metadata + t1 = Table( + 't1', + metadata, + Column('c1', postgresql.TIME()), + Column('c2', postgresql.TIME(precision=5)), + Column('c3', postgresql.TIME(timezone=True, precision=5)), + Column('c4', postgresql.TIMESTAMP()), + Column('c5', postgresql.TIMESTAMP(precision=5)), + Column('c6', postgresql.TIMESTAMP(timezone=True, + precision=5)), + ) + t1.create() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.c1.type.precision, None) + eq_(t2.c.c2.type.precision, 5) + eq_(t2.c.c3.type.precision, 5) + eq_(t2.c.c4.type.precision, None) + eq_(t2.c.c5.type.precision, 5) + eq_(t2.c.c6.type.precision, 5) + eq_(t2.c.c1.type.timezone, False) + eq_(t2.c.c2.type.timezone, False) + eq_(t2.c.c3.type.timezone, True) + eq_(t2.c.c4.type.timezone, False) + eq_(t2.c.c5.type.timezone, False) + eq_(t2.c.c6.type.timezone, True) + +class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): + + __only_on__ = 'postgresql' + + __unsupported_on__ = 'postgresql+pg8000', 'postgresql+zxjdbc' + + @classmethod + def define_tables(cls, metadata): + + class ProcValue(TypeDecorator): + impl = postgresql.ARRAY(Integer, dimensions=2) + + def process_bind_param(self, value, dialect): + if value is None: + return None + return [ + [x + 5 for x in v] + for v in value + ] + + def process_result_value(self, value, dialect): + if value is None: + return None + return [ + [x - 7 for x in v] + for v in value + ] + + Table('arrtable', metadata, + Column('id', Integer, primary_key=True), + Column('intarr', postgresql.ARRAY(Integer)), + Column('strarr', postgresql.ARRAY(Unicode())), + Column('dimarr', ProcValue) + ) + + Table('dim_arrtable', metadata, + Column('id', Integer, primary_key=True), + Column('intarr', postgresql.ARRAY(Integer, dimensions=1)), + Column('strarr', postgresql.ARRAY(Unicode(), dimensions=1)), + Column('dimarr', ProcValue) + ) + + def _fixture_456(self, table): + testing.db.execute( + table.insert(), + intarr=[4, 5, 6] + ) + + def test_reflect_array_column(self): + metadata2 = MetaData(testing.db) + tbl = Table('arrtable', metadata2, autoload=True) + assert isinstance(tbl.c.intarr.type, postgresql.ARRAY) + assert isinstance(tbl.c.strarr.type, postgresql.ARRAY) + assert isinstance(tbl.c.intarr.type.item_type, Integer) + assert isinstance(tbl.c.strarr.type.item_type, String) + + def test_insert_array(self): + arrtable = self.tables.arrtable + arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), + util.u('def')]) + results = arrtable.select().execute().fetchall() + eq_(len(results), 1) + eq_(results[0]['intarr'], [1, 2, 3]) + eq_(results[0]['strarr'], [util.u('abc'), util.u('def')]) + + def test_array_where(self): + arrtable = self.tables.arrtable + arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), + util.u('def')]) + arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC')) + results = arrtable.select().where(arrtable.c.intarr == [1, 2, + 3]).execute().fetchall() + eq_(len(results), 1) + eq_(results[0]['intarr'], [1, 2, 3]) + + def test_array_concat(self): + arrtable = self.tables.arrtable + arrtable.insert().execute(intarr=[1, 2, 3], + strarr=[util.u('abc'), util.u('def')]) + results = select([arrtable.c.intarr + [4, 5, + 6]]).execute().fetchall() + eq_(len(results), 1) + eq_(results[0][0], [ 1, 2, 3, 4, 5, 6, ]) + + def test_array_subtype_resultprocessor(self): + arrtable = self.tables.arrtable + arrtable.insert().execute(intarr=[4, 5, 6], + strarr=[[util.ue('m\xe4\xe4')], [ + util.ue('m\xf6\xf6')]]) + arrtable.insert().execute(intarr=[1, 2, 3], strarr=[ + util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) + results = \ + arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall() + eq_(len(results), 2) + eq_(results[0]['strarr'], [util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) + eq_(results[1]['strarr'], [[util.ue('m\xe4\xe4')], [util.ue('m\xf6\xf6')]]) + + def test_array_literal(self): + eq_( + testing.db.scalar( + select([ + postgresql.array([1, 2]) + postgresql.array([3, 4, 5]) + ]) + ), [1,2,3,4,5] + ) + + def test_array_getitem_single_type(self): + arrtable = self.tables.arrtable + is_(arrtable.c.intarr[1].type._type_affinity, Integer) + is_(arrtable.c.strarr[1].type._type_affinity, String) + + def test_array_getitem_slice_type(self): + arrtable = self.tables.arrtable + is_(arrtable.c.intarr[1:3].type._type_affinity, postgresql.ARRAY) + is_(arrtable.c.strarr[1:3].type._type_affinity, postgresql.ARRAY) + + def test_array_getitem_single_exec(self): + arrtable = self.tables.arrtable + self._fixture_456(arrtable) + eq_( + testing.db.scalar(select([arrtable.c.intarr[2]])), + 5 + ) + testing.db.execute( + arrtable.update().values({arrtable.c.intarr[2]: 7}) + ) + eq_( + testing.db.scalar(select([arrtable.c.intarr[2]])), + 7 + ) + + def test_undim_array_empty(self): + arrtable = self.tables.arrtable + self._fixture_456(arrtable) + eq_( + testing.db.scalar( + select([arrtable.c.intarr]). + where(arrtable.c.intarr.contains([])) + ), + [4, 5, 6] + ) + + def test_array_getitem_slice_exec(self): + arrtable = self.tables.arrtable + testing.db.execute( + arrtable.insert(), + intarr=[4, 5, 6], + strarr=[util.u('abc'), util.u('def')] + ) + eq_( + testing.db.scalar(select([arrtable.c.intarr[2:3]])), + [5, 6] + ) + testing.db.execute( + arrtable.update().values({arrtable.c.intarr[2:3]: [7, 8]}) + ) + eq_( + testing.db.scalar(select([arrtable.c.intarr[2:3]])), + [7, 8] + ) + + + def _test_undim_array_contains_typed_exec(self, struct): + arrtable = self.tables.arrtable + self._fixture_456(arrtable) + eq_( + testing.db.scalar( + select([arrtable.c.intarr]). + where(arrtable.c.intarr.contains(struct([4, 5]))) + ), + [4, 5, 6] + ) + + def test_undim_array_contains_set_exec(self): + self._test_undim_array_contains_typed_exec(set) + + def test_undim_array_contains_list_exec(self): + self._test_undim_array_contains_typed_exec(list) + + def test_undim_array_contains_generator_exec(self): + self._test_undim_array_contains_typed_exec( + lambda elem: (x for x in elem)) + + def _test_dim_array_contains_typed_exec(self, struct): + dim_arrtable = self.tables.dim_arrtable + self._fixture_456(dim_arrtable) + eq_( + testing.db.scalar( + select([dim_arrtable.c.intarr]). + where(dim_arrtable.c.intarr.contains(struct([4, 5]))) + ), + [4, 5, 6] + ) + + def test_dim_array_contains_set_exec(self): + self._test_dim_array_contains_typed_exec(set) + + def test_dim_array_contains_list_exec(self): + self._test_dim_array_contains_typed_exec(list) + + def test_dim_array_contains_generator_exec(self): + self._test_dim_array_contains_typed_exec(lambda elem: (x for x in elem)) + + def test_array_contained_by_exec(self): + arrtable = self.tables.arrtable + with testing.db.connect() as conn: + conn.execute( + arrtable.insert(), + intarr=[6, 5, 4] + ) + eq_( + conn.scalar( + select([arrtable.c.intarr.contained_by([4, 5, 6, 7])]) + ), + True + ) + + def test_array_overlap_exec(self): + arrtable = self.tables.arrtable + with testing.db.connect() as conn: + conn.execute( + arrtable.insert(), + intarr=[4, 5, 6] + ) + eq_( + conn.scalar( + select([arrtable.c.intarr]). + where(arrtable.c.intarr.overlap([7, 6])) + ), + [4, 5, 6] + ) + + def test_array_any_exec(self): + arrtable = self.tables.arrtable + with testing.db.connect() as conn: + conn.execute( + arrtable.insert(), + intarr=[4, 5, 6] + ) + eq_( + conn.scalar( + select([arrtable.c.intarr]). + where(postgresql.Any(5, arrtable.c.intarr)) + ), + [4, 5, 6] + ) + + def test_array_all_exec(self): + arrtable = self.tables.arrtable + with testing.db.connect() as conn: + conn.execute( + arrtable.insert(), + intarr=[4, 5, 6] + ) + eq_( + conn.scalar( + select([arrtable.c.intarr]). + where(arrtable.c.intarr.all(4, operator=operators.le)) + ), + [4, 5, 6] + ) + + + @testing.provide_metadata + def test_tuple_flag(self): + metadata = self.metadata + + t1 = Table('t1', metadata, + Column('id', Integer, primary_key=True), + Column('data', postgresql.ARRAY(String(5), as_tuple=True)), + Column('data2', postgresql.ARRAY(Numeric(asdecimal=False), as_tuple=True)), + ) + metadata.create_all() + testing.db.execute(t1.insert(), id=1, data=["1","2","3"], data2=[5.4, 5.6]) + testing.db.execute(t1.insert(), id=2, data=["4", "5", "6"], data2=[1.0]) + testing.db.execute(t1.insert(), id=3, data=[["4", "5"], ["6", "7"]], + data2=[[5.4, 5.6], [1.0, 1.1]]) + + r = testing.db.execute(t1.select().order_by(t1.c.id)).fetchall() + eq_( + r, + [ + (1, ('1', '2', '3'), (5.4, 5.6)), + (2, ('4', '5', '6'), (1.0,)), + (3, (('4', '5'), ('6', '7')), ((5.4, 5.6), (1.0, 1.1))) + ] + ) + # hashable + eq_( + set(row[1] for row in r), + set([('1', '2', '3'), ('4', '5', '6'), (('4', '5'), ('6', '7'))]) + ) + + def test_dimension(self): + arrtable = self.tables.arrtable + testing.db.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4,5, 6]]) + eq_( + testing.db.scalar(select([arrtable.c.dimarr])), + [[-1, 0, 1], [2, 3, 4]] + ) + +class TimestampTest(fixtures.TestBase, AssertsExecutionResults): + __only_on__ = 'postgresql' + + def test_timestamp(self): + engine = testing.db + connection = engine.connect() + + s = select(["timestamp '2007-12-25'"]) + result = connection.execute(s).first() + eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0)) + + +class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): + """test DDL and reflection of PG-specific types """ + + __only_on__ = 'postgresql' + __excluded_on__ = (('postgresql', '<', (8, 3, 0)),) + + @classmethod + def setup_class(cls): + global metadata, table + metadata = MetaData(testing.db) + + # create these types so that we can issue + # special SQL92 INTERVAL syntax + class y2m(types.UserDefinedType, postgresql.INTERVAL): + def get_col_spec(self): + return "INTERVAL YEAR TO MONTH" + + class d2s(types.UserDefinedType, postgresql.INTERVAL): + def get_col_spec(self): + return "INTERVAL DAY TO SECOND" + + table = Table('sometable', metadata, + Column('id', postgresql.UUID, primary_key=True), + Column('flag', postgresql.BIT), + Column('bitstring', postgresql.BIT(4)), + Column('addr', postgresql.INET), + Column('addr2', postgresql.MACADDR), + Column('addr3', postgresql.CIDR), + Column('doubleprec', postgresql.DOUBLE_PRECISION), + Column('plain_interval', postgresql.INTERVAL), + Column('year_interval', y2m()), + Column('month_interval', d2s()), + Column('precision_interval', postgresql.INTERVAL(precision=3)) + ) + + metadata.create_all() + + # cheat so that the "strict type check" + # works + table.c.year_interval.type = postgresql.INTERVAL() + table.c.month_interval.type = postgresql.INTERVAL() + + @classmethod + def teardown_class(cls): + metadata.drop_all() + + def test_reflection(self): + m = MetaData(testing.db) + t = Table('sometable', m, autoload=True) + + self.assert_tables_equal(table, t, strict_types=True) + assert t.c.plain_interval.type.precision is None + assert t.c.precision_interval.type.precision == 3 + assert t.c.bitstring.type.length == 4 + + def test_bit_compile(self): + pairs = [(postgresql.BIT(), 'BIT(1)'), + (postgresql.BIT(5), 'BIT(5)'), + (postgresql.BIT(varying=True), 'BIT VARYING'), + (postgresql.BIT(5, varying=True), 'BIT VARYING(5)'), + ] + for type_, expected in pairs: + self.assert_compile(type_, expected) + + @testing.provide_metadata + def test_bit_reflection(self): + metadata = self.metadata + t1 = Table('t1', metadata, + Column('bit1', postgresql.BIT()), + Column('bit5', postgresql.BIT(5)), + Column('bitvarying', postgresql.BIT(varying=True)), + Column('bitvarying5', postgresql.BIT(5, varying=True)), + ) + t1.create() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.bit1.type.length, 1) + eq_(t2.c.bit1.type.varying, False) + eq_(t2.c.bit5.type.length, 5) + eq_(t2.c.bit5.type.varying, False) + eq_(t2.c.bitvarying.type.length, None) + eq_(t2.c.bitvarying.type.varying, True) + eq_(t2.c.bitvarying5.type.length, 5) + eq_(t2.c.bitvarying5.type.varying, True) + +class UUIDTest(fixtures.TestBase): + """Test the bind/return values of the UUID type.""" + + __only_on__ = 'postgresql' + + @testing.requires.python25 + @testing.fails_on('postgresql+zxjdbc', + 'column "data" is of type uuid but expression is of type character varying') + @testing.fails_on('postgresql+pg8000', 'No support for UUID type') + def test_uuid_string(self): + import uuid + self._test_round_trip( + Table('utable', MetaData(), + Column('data', postgresql.UUID()) + ), + str(uuid.uuid4()), + str(uuid.uuid4()) + ) + + @testing.requires.python25 + @testing.fails_on('postgresql+zxjdbc', + 'column "data" is of type uuid but expression is of type character varying') + @testing.fails_on('postgresql+pg8000', 'No support for UUID type') + def test_uuid_uuid(self): + import uuid + self._test_round_trip( + Table('utable', MetaData(), + Column('data', postgresql.UUID(as_uuid=True)) + ), + uuid.uuid4(), + uuid.uuid4() + ) + + def test_no_uuid_available(self): + from sqlalchemy.dialects.postgresql import base + uuid_type = base._python_UUID + base._python_UUID = None + try: + assert_raises( + NotImplementedError, + postgresql.UUID, as_uuid=True + ) + finally: + base._python_UUID = uuid_type + + def setup(self): + self.conn = testing.db.connect() + trans = self.conn.begin() + + def teardown(self): + self.conn.close() + + def _test_round_trip(self, utable, value1, value2): + utable.create(self.conn) + self.conn.execute(utable.insert(), {'data':value1}) + self.conn.execute(utable.insert(), {'data':value2}) + r = self.conn.execute( + select([utable.c.data]). + where(utable.c.data != value1) + ) + eq_(r.fetchone()[0], value2) + eq_(r.fetchone(), None) + + + +class HStoreTest(fixtures.TestBase): + def _assert_sql(self, construct, expected): + dialect = postgresql.dialect() + compiled = str(construct.compile(dialect=dialect)) + compiled = re.sub(r'\s+', ' ', compiled) + expected = re.sub(r'\s+', ' ', expected) + eq_(compiled, expected) + + def setup(self): + metadata = MetaData() + self.test_table = Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('hash', HSTORE) + ) + self.hashcol = self.test_table.c.hash + + def _test_where(self, whereclause, expected): + stmt = select([self.test_table]).where(whereclause) + self._assert_sql( + stmt, + "SELECT test_table.id, test_table.hash FROM test_table " + "WHERE %s" % expected + ) + + def _test_cols(self, colclause, expected, from_=True): + stmt = select([colclause]) + self._assert_sql( + stmt, + ( + "SELECT %s" + + (" FROM test_table" if from_ else "") + ) % expected + ) + + def test_bind_serialize_default(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + eq_( + proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), + '"key1"=>"value1", "key2"=>"value2"' + ) + + def test_bind_serialize_with_slashes_and_quotes(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + eq_( + proc({'\\"a': '\\"1'}), + '"\\\\\\"a"=>"\\\\\\"1"' + ) + + def test_parse_error(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + assert_raises_message( + ValueError, + r'''After u?'\[\.\.\.\], "key1"=>"value1", ', could not parse ''' + '''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''', + proc, + '"key2"=>"value2", "key1"=>"value1", ' + 'crapcrapcrap, "key3"=>"value3"' + ) + + def test_result_deserialize_default(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + eq_( + proc('"key2"=>"value2", "key1"=>"value1"'), + {"key1": "value1", "key2": "value2"} + ) + + def test_result_deserialize_with_slashes_and_quotes(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + eq_( + proc('"\\\\\\"a"=>"\\\\\\"1"'), + {'\\"a': '\\"1'} + ) + + def test_bind_serialize_psycopg2(self): + from sqlalchemy.dialects.postgresql import psycopg2 + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = True + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + is_(proc, None) + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = False + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + eq_( + proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), + '"key1"=>"value1", "key2"=>"value2"' + ) + + def test_result_deserialize_psycopg2(self): + from sqlalchemy.dialects.postgresql import psycopg2 + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = True + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + is_(proc, None) + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = False + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + eq_( + proc('"key2"=>"value2", "key1"=>"value1"'), + {"key1": "value1", "key2": "value2"} + ) + + def test_where_has_key(self): + self._test_where( + # hide from 2to3 + getattr(self.hashcol, 'has_key')('foo'), + "test_table.hash ? %(hash_1)s" + ) + + def test_where_has_all(self): + self._test_where( + self.hashcol.has_all(postgresql.array(['1', '2'])), + "test_table.hash ?& ARRAY[%(param_1)s, %(param_2)s]" + ) + + def test_where_has_any(self): + self._test_where( + self.hashcol.has_any(postgresql.array(['1', '2'])), + "test_table.hash ?| ARRAY[%(param_1)s, %(param_2)s]" + ) + + def test_where_defined(self): + self._test_where( + self.hashcol.defined('foo'), + "defined(test_table.hash, %(param_1)s)" + ) + + def test_where_contains(self): + self._test_where( + self.hashcol.contains({'foo': '1'}), + "test_table.hash @> %(hash_1)s" + ) + + def test_where_contained_by(self): + self._test_where( + self.hashcol.contained_by({'foo': '1', 'bar': None}), + "test_table.hash <@ %(hash_1)s" + ) + + def test_where_getitem(self): + self._test_where( + self.hashcol['bar'] == None, + "(test_table.hash -> %(hash_1)s) IS NULL" + ) + + def test_cols_get(self): + self._test_cols( + self.hashcol['foo'], + "test_table.hash -> %(hash_1)s AS anon_1", + True + ) + + def test_cols_delete_single_key(self): + self._test_cols( + self.hashcol.delete('foo'), + "delete(test_table.hash, %(param_1)s) AS delete_1", + True + ) + + def test_cols_delete_array_of_keys(self): + self._test_cols( + self.hashcol.delete(postgresql.array(['foo', 'bar'])), + ("delete(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) " + "AS delete_1"), + True + ) + + def test_cols_delete_matching_pairs(self): + self._test_cols( + self.hashcol.delete(hstore('1', '2')), + ("delete(test_table.hash, hstore(%(param_1)s, %(param_2)s)) " + "AS delete_1"), + True + ) + + def test_cols_slice(self): + self._test_cols( + self.hashcol.slice(postgresql.array(['1', '2'])), + ("slice(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) " + "AS slice_1"), + True + ) + + def test_cols_hstore_pair_text(self): + self._test_cols( + hstore('foo', '3')['foo'], + "hstore(%(param_1)s, %(param_2)s) -> %(hstore_1)s AS anon_1", + False + ) + + def test_cols_hstore_pair_array(self): + self._test_cols( + hstore(postgresql.array(['1', '2']), + postgresql.array(['3', None]))['1'], + ("hstore(ARRAY[%(param_1)s, %(param_2)s], " + "ARRAY[%(param_3)s, NULL]) -> %(hstore_1)s AS anon_1"), + False + ) + + def test_cols_hstore_single_array(self): + self._test_cols( + hstore(postgresql.array(['1', '2', '3', None]))['3'], + ("hstore(ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, NULL]) " + "-> %(hstore_1)s AS anon_1"), + False + ) + + def test_cols_concat(self): + self._test_cols( + self.hashcol.concat(hstore(cast(self.test_table.c.id, Text), '3')), + ("test_table.hash || hstore(CAST(test_table.id AS TEXT), " + "%(param_1)s) AS anon_1"), + True + ) + + def test_cols_concat_op(self): + self._test_cols( + hstore('foo', 'bar') + self.hashcol, + "hstore(%(param_1)s, %(param_2)s) || test_table.hash AS anon_1", + True + ) + + def test_cols_concat_get(self): + self._test_cols( + (self.hashcol + self.hashcol)['foo'], + "test_table.hash || test_table.hash -> %(param_1)s AS anon_1" + ) + + def test_cols_keys(self): + self._test_cols( + # hide from 2to3 + getattr(self.hashcol, 'keys')(), + "akeys(test_table.hash) AS akeys_1", + True + ) + + def test_cols_vals(self): + self._test_cols( + self.hashcol.vals(), + "avals(test_table.hash) AS avals_1", + True + ) + + def test_cols_array(self): + self._test_cols( + self.hashcol.array(), + "hstore_to_array(test_table.hash) AS hstore_to_array_1", + True + ) + + def test_cols_matrix(self): + self._test_cols( + self.hashcol.matrix(), + "hstore_to_matrix(test_table.hash) AS hstore_to_matrix_1", + True + ) + + +class HStoreRoundTripTest(fixtures.TablesTest): + __requires__ = 'hstore', + __dialect__ = 'postgresql' + + @classmethod + def define_tables(cls, metadata): + Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', HSTORE) + ) + + def _fixture_data(self, engine): + data_table = self.tables.data_table + engine.execute( + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}}, + ) + + def _assert_data(self, compare): + data = testing.db.execute( + select([self.tables.data_table.c.data]). + order_by(self.tables.data_table.c.name) + ).fetchall() + eq_([d for d, in data], compare) + + def _test_insert(self, engine): + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} + ) + self._assert_data([{"k1": "r1v1", "k2": "r1v2"}]) + + def _non_native_engine(self): + if testing.against("postgresql+psycopg2"): + engine = engines.testing_engine(options=dict(use_native_hstore=False)) + else: + engine = testing.db + engine.connect() + return engine + + def test_reflect(self): + from sqlalchemy import inspect + insp = inspect(testing.db) + cols = insp.get_columns('data_table') + assert isinstance(cols[2]['type'], HSTORE) + + @testing.only_on("postgresql+psycopg2") + def test_insert_native(self): + engine = testing.db + self._test_insert(engine) + + def test_insert_python(self): + engine = self._non_native_engine() + self._test_insert(engine) + + @testing.only_on("postgresql+psycopg2") + def test_criterion_native(self): + engine = testing.db + self._fixture_data(engine) + self._test_criterion(engine) + + def test_criterion_python(self): + engine = self._non_native_engine() + self._fixture_data(engine) + self._test_criterion(engine) + + def _test_criterion(self, engine): + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data]).where(data_table.c.data['k1'] == 'r3v1') + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) + + def _test_fixed_round_trip(self, engine): + s = select([ + hstore( + array(['key1', 'key2', 'key3']), + array(['value1', 'value2', 'value3']) + ) + ]) + eq_( + engine.scalar(s), + {"key1": "value1", "key2": "value2", "key3": "value3"} + ) + + def test_fixed_round_trip_python(self): + engine = self._non_native_engine() + self._test_fixed_round_trip(engine) + + @testing.only_on("postgresql+psycopg2") + def test_fixed_round_trip_native(self): + engine = testing.db + self._test_fixed_round_trip(engine) + + def _test_unicode_round_trip(self, engine): + s = select([ + hstore( + array([util.u('réveillé'), util.u('drôle'), util.u('S’il')]), + array([util.u('réveillé'), util.u('drôle'), util.u('S’il')]) + ) + ]) + eq_( + engine.scalar(s), + { + util.u('réveillé'): util.u('réveillé'), + util.u('drôle'): util.u('drôle'), + util.u('S’il'): util.u('S’il') + } + ) + + def test_unicode_round_trip_python(self): + engine = self._non_native_engine() + self._test_unicode_round_trip(engine) + + @testing.only_on("postgresql+psycopg2") + def test_unicode_round_trip_native(self): + engine = testing.db + self._test_unicode_round_trip(engine) + + def test_escaped_quotes_round_trip_python(self): + engine = self._non_native_engine() + self._test_escaped_quotes_round_trip(engine) + + @testing.only_on("postgresql+psycopg2") + def test_escaped_quotes_round_trip_native(self): + engine = testing.db + self._test_escaped_quotes_round_trip(engine) + + def _test_escaped_quotes_round_trip(self, engine): + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {r'key \"foo\"': r'value \"bar"\ xyz'}} + ) + self._assert_data([{r'key \"foo\"': r'value \"bar"\ xyz'}]) + +class _RangeTypeMixin(object): + __requires__ = 'range_types', + __dialect__ = 'postgresql+psycopg2' + + @property + def extras(self): + # done this way so we don't get ImportErrors with + # older psycopg2 versions. + from psycopg2 import extras + return extras + + @classmethod + def define_tables(cls, metadata): + # no reason ranges shouldn't be primary keys, + # so lets just use them as such + table = Table('data_table', metadata, + Column('range', cls._col_type, primary_key=True), + ) + cls.col = table.c.range + + def test_actual_type(self): + eq_(str(self._col_type()), self._col_str) + + def test_reflect(self): + from sqlalchemy import inspect + insp = inspect(testing.db) + cols = insp.get_columns('data_table') + assert isinstance(cols[0]['type'], self._col_type) + + def _assert_data(self): + data = testing.db.execute( + select([self.tables.data_table.c.range]) + ).fetchall() + eq_(data, [(self._data_obj(), )]) + + def test_insert_obj(self): + testing.db.engine.execute( + self.tables.data_table.insert(), + {'range': self._data_obj()} + ) + self._assert_data() + + def test_insert_text(self): + testing.db.engine.execute( + self.tables.data_table.insert(), + {'range': self._data_str} + ) + self._assert_data() + + # operator tests + + def _test_clause(self, colclause, expected): + dialect = postgresql.dialect() + compiled = str(colclause.compile(dialect=dialect)) + eq_(compiled, expected) + + def test_where_equal(self): + self._test_clause( + self.col==self._data_str, + "data_table.range = %(range_1)s" + ) + + def test_where_not_equal(self): + self._test_clause( + self.col!=self._data_str, + "data_table.range <> %(range_1)s" + ) + + def test_where_less_than(self): + self._test_clause( + self.col < self._data_str, + "data_table.range < %(range_1)s" + ) + + def test_where_greater_than(self): + self._test_clause( + self.col > self._data_str, + "data_table.range > %(range_1)s" + ) + + def test_where_less_than_or_equal(self): + self._test_clause( + self.col <= self._data_str, + "data_table.range <= %(range_1)s" + ) + + def test_where_greater_than_or_equal(self): + self._test_clause( + self.col >= self._data_str, + "data_table.range >= %(range_1)s" + ) + + def test_contains(self): + self._test_clause( + self.col.contains(self._data_str), + "data_table.range @> %(range_1)s" + ) + + def test_contained_by(self): + self._test_clause( + self.col.contained_by(self._data_str), + "data_table.range <@ %(range_1)s" + ) + + def test_overlaps(self): + self._test_clause( + self.col.overlaps(self._data_str), + "data_table.range && %(range_1)s" + ) + + def test_strictly_left_of(self): + self._test_clause( + self.col << self._data_str, + "data_table.range << %(range_1)s" + ) + self._test_clause( + self.col.strictly_left_of(self._data_str), + "data_table.range << %(range_1)s" + ) + + def test_strictly_right_of(self): + self._test_clause( + self.col >> self._data_str, + "data_table.range >> %(range_1)s" + ) + self._test_clause( + self.col.strictly_right_of(self._data_str), + "data_table.range >> %(range_1)s" + ) + + def test_not_extend_right_of(self): + self._test_clause( + self.col.not_extend_right_of(self._data_str), + "data_table.range &< %(range_1)s" + ) + + def test_not_extend_left_of(self): + self._test_clause( + self.col.not_extend_left_of(self._data_str), + "data_table.range &> %(range_1)s" + ) + + def test_adjacent_to(self): + self._test_clause( + self.col.adjacent_to(self._data_str), + "data_table.range -|- %(range_1)s" + ) + + def test_union(self): + self._test_clause( + self.col + self.col, + "data_table.range + data_table.range" + ) + + def test_union_result(self): + # insert + testing.db.engine.execute( + self.tables.data_table.insert(), + {'range': self._data_str} + ) + # select + range = self.tables.data_table.c.range + data = testing.db.execute( + select([range + range]) + ).fetchall() + eq_(data, [(self._data_obj(), )]) + + + def test_intersection(self): + self._test_clause( + self.col * self.col, + "data_table.range * data_table.range" + ) + + def test_intersection_result(self): + # insert + testing.db.engine.execute( + self.tables.data_table.insert(), + {'range': self._data_str} + ) + # select + range = self.tables.data_table.c.range + data = testing.db.execute( + select([range * range]) + ).fetchall() + eq_(data, [(self._data_obj(), )]) + + def test_different(self): + self._test_clause( + self.col - self.col, + "data_table.range - data_table.range" + ) + + def test_difference_result(self): + # insert + testing.db.engine.execute( + self.tables.data_table.insert(), + {'range': self._data_str} + ) + # select + range = self.tables.data_table.c.range + data = testing.db.execute( + select([range - range]) + ).fetchall() + eq_(data, [(self._data_obj().__class__(empty=True), )]) + +class Int4RangeTests(_RangeTypeMixin, fixtures.TablesTest): + + _col_type = INT4RANGE + _col_str = 'INT4RANGE' + _data_str = '[1,2)' + def _data_obj(self): + return self.extras.NumericRange(1, 2) + +class Int8RangeTests(_RangeTypeMixin, fixtures.TablesTest): + + _col_type = INT8RANGE + _col_str = 'INT8RANGE' + _data_str = '[9223372036854775806,9223372036854775807)' + def _data_obj(self): + return self.extras.NumericRange( + 9223372036854775806, 9223372036854775807 + ) + +class NumRangeTests(_RangeTypeMixin, fixtures.TablesTest): + + _col_type = NUMRANGE + _col_str = 'NUMRANGE' + _data_str = '[1.0,2.0)' + def _data_obj(self): + return self.extras.NumericRange( + decimal.Decimal('1.0'), decimal.Decimal('2.0') + ) + +class DateRangeTests(_RangeTypeMixin, fixtures.TablesTest): + + _col_type = DATERANGE + _col_str = 'DATERANGE' + _data_str = '[2013-03-23,2013-03-24)' + def _data_obj(self): + return self.extras.DateRange( + datetime.date(2013, 3, 23), datetime.date(2013, 3, 24) + ) + +class DateTimeRangeTests(_RangeTypeMixin, fixtures.TablesTest): + + _col_type = TSRANGE + _col_str = 'TSRANGE' + _data_str = '[2013-03-23 14:30,2013-03-23 23:30)' + def _data_obj(self): + return self.extras.DateTimeRange( + datetime.datetime(2013, 3, 23, 14, 30), + datetime.datetime(2013, 3, 23, 23, 30) + ) + +class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest): + + _col_type = TSTZRANGE + _col_str = 'TSTZRANGE' + + # make sure we use one, steady timestamp with timezone pair + # for all parts of all these tests + _tstzs = None + def tstzs(self): + if self._tstzs is None: + lower = testing.db.connect().scalar( + func.current_timestamp().select() + ) + upper = lower+datetime.timedelta(1) + self._tstzs = (lower, upper) + return self._tstzs + + @property + def _data_str(self): + return '[%s,%s)' % self.tstzs() + + def _data_obj(self): + return self.extras.DateTimeTZRange(*self.tstzs()) |
