summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_dialect.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/postgresql/test_dialect.py')
-rw-r--r--test/dialect/postgresql/test_dialect.py221
1 files changed, 221 insertions, 0 deletions
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
new file mode 100644
index 000000000..86ce91dc9
--- /dev/null
+++ b/test/dialect/postgresql/test_dialect.py
@@ -0,0 +1,221 @@
+# coding: utf-8
+
+from sqlalchemy.testing.assertions import eq_, assert_raises, \
+ assert_raises_message, AssertsExecutionResults, \
+ AssertsCompiledSQL
+from sqlalchemy.testing import engines, fixtures
+from sqlalchemy import testing
+import datetime
+from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
+ String, Sequence, ForeignKey, join, Numeric, \
+ PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
+ func, literal_column, literal, bindparam, cast, extract, \
+ SmallInteger, Enum, REAL, update, insert, Index, delete, \
+ and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
+from sqlalchemy import exc, schema
+from sqlalchemy.dialects.postgresql import base as postgresql
+import logging
+import logging.handlers
+
+class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
+ __only_on__ = 'postgresql'
+
+ @testing.provide_metadata
+ def test_date_reflection(self):
+ metadata = self.metadata
+ t1 = Table('pgdate', metadata, Column('date1',
+ DateTime(timezone=True)), Column('date2',
+ DateTime(timezone=False)))
+ metadata.create_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('pgdate', m2, autoload=True)
+ assert t2.c.date1.type.timezone is True
+ assert t2.c.date2.type.timezone is False
+
+ @testing.fails_on('+zxjdbc',
+ 'The JDBC driver handles the version parsing')
+ def test_version_parsing(self):
+
+
+ class MockConn(object):
+
+ def __init__(self, res):
+ self.res = res
+
+ def execute(self, str):
+ return self
+
+ def scalar(self):
+ return self.res
+
+
+ for string, version in \
+ [('PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
+ 'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)', (8, 3,
+ 8)),
+ ('PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, '
+ 'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)),
+ ('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, '
+ 'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), '
+ '64-bit', (9, 1, 2))]:
+ eq_(testing.db.dialect._get_server_version_info(MockConn(string)),
+ version)
+
+ @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
+ def test_psycopg2_version(self):
+ v = testing.db.dialect.psycopg2_version
+ assert testing.db.dialect.dbapi.__version__.\
+ startswith(".".join(str(x) for x in v))
+
+ @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
+ def test_notice_logging(self):
+ log = logging.getLogger('sqlalchemy.dialects.postgresql')
+ buf = logging.handlers.BufferingHandler(100)
+ lev = log.level
+ log.addHandler(buf)
+ log.setLevel(logging.INFO)
+ try:
+ conn = testing.db.connect()
+ trans = conn.begin()
+ try:
+ conn.execute('create table foo (id serial primary key)')
+ finally:
+ trans.rollback()
+ finally:
+ log.removeHandler(buf)
+ log.setLevel(lev)
+ msgs = ' '.join(b.msg for b in buf.buffer)
+ assert 'will create implicit sequence' in msgs
+ assert 'will create implicit index' in msgs
+
+ @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
+ @engines.close_open_connections
+ def test_client_encoding(self):
+ c = testing.db.connect()
+ current_encoding = c.connection.connection.encoding
+ c.close()
+
+ # attempt to use an encoding that's not
+ # already set
+ if current_encoding == 'UTF8':
+ test_encoding = 'LATIN1'
+ else:
+ test_encoding = 'UTF8'
+
+ e = engines.testing_engine(
+ options={'client_encoding':test_encoding}
+ )
+ c = e.connect()
+ eq_(c.connection.connection.encoding, test_encoding)
+
+ @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
+ @engines.close_open_connections
+ def test_autocommit_isolation_level(self):
+ extensions = __import__('psycopg2.extensions').extensions
+
+ c = testing.db.connect()
+ c = c.execution_options(isolation_level='AUTOCOMMIT')
+ eq_(c.connection.connection.isolation_level,
+ extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+
+ @testing.fails_on('+zxjdbc',
+ "Can't infer the SQL type to use for an instance "
+ "of org.python.core.PyObjectDerived.")
+ @testing.fails_on('+pg8000', "Can't determine correct type.")
+ def test_extract(self):
+ fivedaysago = datetime.datetime.now() \
+ - datetime.timedelta(days=5)
+ for field, exp in ('year', fivedaysago.year), ('month',
+ fivedaysago.month), ('day', fivedaysago.day):
+ r = testing.db.execute(select([extract(field, func.now()
+ + datetime.timedelta(days=-5))])).scalar()
+ eq_(r, exp)
+
+ def test_checksfor_sequence(self):
+ meta1 = MetaData(testing.db)
+ seq = Sequence('fooseq')
+ t = Table('mytable', meta1, Column('col1', Integer,
+ seq))
+ seq.drop()
+ try:
+ testing.db.execute('CREATE SEQUENCE fooseq')
+ t.create(checkfirst=True)
+ finally:
+ t.drop(checkfirst=True)
+
+ def test_schema_roundtrips(self):
+ meta = MetaData(testing.db)
+ users = Table('users', meta, Column('id', Integer,
+ primary_key=True), Column('name', String(50)),
+ schema='test_schema')
+ users.create()
+ try:
+ users.insert().execute(id=1, name='name1')
+ users.insert().execute(id=2, name='name2')
+ users.insert().execute(id=3, name='name3')
+ users.insert().execute(id=4, name='name4')
+ eq_(users.select().where(users.c.name == 'name2'
+ ).execute().fetchall(), [(2, 'name2')])
+ eq_(users.select(use_labels=True).where(users.c.name
+ == 'name2').execute().fetchall(), [(2, 'name2')])
+ users.delete().where(users.c.id == 3).execute()
+ eq_(users.select().where(users.c.name == 'name3'
+ ).execute().fetchall(), [])
+ users.update().where(users.c.name == 'name4'
+ ).execute(name='newname')
+ eq_(users.select(use_labels=True).where(users.c.id
+ == 4).execute().fetchall(), [(4, 'newname')])
+ finally:
+ users.drop()
+
+ def test_preexecute_passivedefault(self):
+ """test that when we get a primary key column back from
+ reflecting a table which has a default value on it, we pre-
+ execute that DefaultClause upon insert."""
+
+ try:
+ meta = MetaData(testing.db)
+ testing.db.execute("""
+ CREATE TABLE speedy_users
+ (
+ speedy_user_id SERIAL PRIMARY KEY,
+
+ user_name VARCHAR NOT NULL,
+ user_password VARCHAR NOT NULL
+ );
+ """)
+ t = Table('speedy_users', meta, autoload=True)
+ r = t.insert().execute(user_name='user',
+ user_password='lala')
+ assert r.inserted_primary_key == [1]
+ l = t.select().execute().fetchall()
+ assert l == [(1, 'user', 'lala')]
+ finally:
+ testing.db.execute('drop table speedy_users')
+
+
+ @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion')
+ @testing.fails_on('pypostgresql',
+ 'psycopg2/pg8000 specific assertion')
+ def test_numeric_raise(self):
+ stmt = text("select cast('hi' as char) as hi", typemap={'hi'
+ : Numeric})
+ assert_raises(exc.InvalidRequestError, testing.db.execute, stmt)
+
+ def test_serial_integer(self):
+ for type_, expected in [
+ (Integer, 'SERIAL'),
+ (BigInteger, 'BIGSERIAL'),
+ (SmallInteger, 'SMALLINT'),
+ (postgresql.INTEGER, 'SERIAL'),
+ (postgresql.BIGINT, 'BIGSERIAL'),
+ ]:
+ m = MetaData()
+
+ t = Table('t', m, Column('c', type_, primary_key=True))
+ ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t))
+ eq_(
+ ddl_compiler.get_column_specification(t.c.c),
+ "c %s NOT NULL" % expected
+ )