summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_dialect.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/postgresql/test_dialect.py')
-rw-r--r--test/dialect/postgresql/test_dialect.py134
1 files changed, 117 insertions, 17 deletions
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index 57682686c..02d7ad483 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -8,6 +8,7 @@ from sqlalchemy import BigInteger
from sqlalchemy import bindparam
from sqlalchemy import cast
from sqlalchemy import Column
+from sqlalchemy import create_engine
from sqlalchemy import DateTime
from sqlalchemy import DDL
from sqlalchemy import event
@@ -30,6 +31,9 @@ from sqlalchemy import text
from sqlalchemy import TypeDecorator
from sqlalchemy import util
from sqlalchemy.dialects.postgresql import base as postgresql
+from sqlalchemy.dialects.postgresql import HSTORE
+from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.dialects.postgresql import psycopg as psycopg_dialect
from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_PLAIN
@@ -269,10 +273,12 @@ class PGCodeTest(fixtures.TestBase):
if testing.against("postgresql+pg8000"):
# TODO: is there another way we're supposed to see this?
eq_(errmsg.orig.args[0]["C"], "23505")
- else:
+ elif not testing.against("postgresql+psycopg"):
eq_(errmsg.orig.pgcode, "23505")
- if testing.against("postgresql+asyncpg"):
+ if testing.against("postgresql+asyncpg") or testing.against(
+ "postgresql+psycopg"
+ ):
eq_(errmsg.orig.sqlstate, "23505")
@@ -858,6 +864,13 @@ class MiscBackendTest(
".".join(str(x) for x in v)
)
+ @testing.only_on("postgresql+psycopg")
+ def test_psycopg_version(self):
+ v = testing.db.dialect.psycopg_version
+ assert testing.db.dialect.dbapi.__version__.startswith(
+ ".".join(str(x) for x in v)
+ )
+
@testing.combinations(
((8, 1), False, False),
((8, 1), None, False),
@@ -902,6 +915,7 @@ class MiscBackendTest(
with testing.db.connect().execution_options(
isolation_level="SERIALIZABLE"
) as conn:
+
dbapi_conn = conn.connection.dbapi_connection
is_false(dbapi_conn.autocommit)
@@ -1069,25 +1083,30 @@ class MiscBackendTest(
dbapi_conn.rollback()
eq_(val, "off")
- @testing.requires.psycopg2_compatibility
- def test_psycopg2_non_standard_err(self):
+ @testing.requires.psycopg_compatibility
+ def test_psycopg_non_standard_err(self):
# note that psycopg2 is sometimes called psycopg2cffi
# depending on platform
- psycopg2 = testing.db.dialect.dbapi
- TransactionRollbackError = __import__(
- "%s.extensions" % psycopg2.__name__
- ).extensions.TransactionRollbackError
+ psycopg = testing.db.dialect.dbapi
+ if psycopg.__version__.startswith("3"):
+ TransactionRollbackError = __import__(
+ "%s.errors" % psycopg.__name__
+ ).errors.TransactionRollback
+ else:
+ TransactionRollbackError = __import__(
+ "%s.extensions" % psycopg.__name__
+ ).extensions.TransactionRollbackError
exception = exc.DBAPIError.instance(
"some statement",
{},
TransactionRollbackError("foo"),
- psycopg2.Error,
+ psycopg.Error,
)
assert isinstance(exception, exc.OperationalError)
@testing.requires.no_coverage
- @testing.requires.psycopg2_compatibility
+ @testing.requires.psycopg_compatibility
def test_notice_logging(self):
log = logging.getLogger("sqlalchemy.dialects.postgresql")
buf = logging.handlers.BufferingHandler(100)
@@ -1115,14 +1134,14 @@ $$ LANGUAGE plpgsql;
finally:
log.removeHandler(buf)
log.setLevel(lev)
- msgs = " ".join(b.msg for b in buf.buffer)
+ msgs = " ".join(b.getMessage() for b in buf.buffer)
eq_regex(
msgs,
- "NOTICE: notice: hi there(\nCONTEXT: .*?)? "
- "NOTICE: notice: another note(\nCONTEXT: .*?)?",
+ "NOTICE: [ ]?notice: hi there(\nCONTEXT: .*?)? "
+ "NOTICE: [ ]?notice: another note(\nCONTEXT: .*?)?",
)
- @testing.requires.psycopg2_or_pg8000_compatibility
+ @testing.requires.psycopg_or_pg8000_compatibility
@engines.close_open_connections
def test_client_encoding(self):
c = testing.db.connect()
@@ -1143,7 +1162,7 @@ $$ LANGUAGE plpgsql;
new_encoding = c.exec_driver_sql("show client_encoding").fetchone()[0]
eq_(new_encoding, test_encoding)
- @testing.requires.psycopg2_or_pg8000_compatibility
+ @testing.requires.psycopg_or_pg8000_compatibility
@engines.close_open_connections
def test_autocommit_isolation_level(self):
c = testing.db.connect().execution_options(
@@ -1302,7 +1321,7 @@ $$ LANGUAGE plpgsql;
assert result == [(1, "user", "lala")]
connection.execute(text("DROP TABLE speedy_users"))
- @testing.requires.psycopg2_or_pg8000_compatibility
+ @testing.requires.psycopg_or_pg8000_compatibility
def test_numeric_raise(self, connection):
stmt = text("select cast('hi' as char) as hi").columns(hi=Numeric)
assert_raises(exc.InvalidRequestError, connection.execute, stmt)
@@ -1364,9 +1383,90 @@ $$ LANGUAGE plpgsql;
)
@testing.requires.psycopg2_compatibility
- def test_initial_transaction_state(self):
+ def test_initial_transaction_state_psycopg2(self):
from psycopg2.extensions import STATUS_IN_TRANSACTION
engine = engines.testing_engine()
with engine.connect() as conn:
ne_(conn.connection.status, STATUS_IN_TRANSACTION)
+
+ @testing.only_on("postgresql+psycopg")
+ def test_initial_transaction_state_psycopg(self):
+ from psycopg.pq import TransactionStatus
+
+ engine = engines.testing_engine()
+ with engine.connect() as conn:
+ ne_(
+ conn.connection.dbapi_connection.info.transaction_status,
+ TransactionStatus.INTRANS,
+ )
+
+
+class Psycopg3Test(fixtures.TestBase):
+ __only_on__ = ("postgresql+psycopg",)
+
+ def test_json_correctly_registered(self, testing_engine):
+ import json
+
+ def loads(value):
+ value = json.loads(value)
+ value["x"] = value["x"] + "_loads"
+ return value
+
+ def dumps(value):
+ value = dict(value)
+ value["x"] = "dumps_y"
+ return json.dumps(value)
+
+ engine = testing_engine(
+ options=dict(json_serializer=dumps, json_deserializer=loads)
+ )
+ engine2 = testing_engine(
+ options=dict(
+ json_serializer=json.dumps, json_deserializer=json.loads
+ )
+ )
+
+ s = select(cast({"key": "value", "x": "q"}, JSONB))
+ with engine.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "dumps_y_loads"})
+ with engine.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "dumps_y_loads"})
+ with engine2.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "q"})
+ with engine.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "dumps_y_loads"})
+
+ @testing.requires.hstore
+ def test_hstore_correctly_registered(self, testing_engine):
+ engine = testing_engine(options=dict(use_native_hstore=True))
+ engine2 = testing_engine(options=dict(use_native_hstore=False))
+
+ def rp(self, *a):
+ return lambda a: {"a": "b"}
+
+ with mock.patch.object(HSTORE, "result_processor", side_effect=rp):
+ s = select(cast({"key": "value", "x": "q"}, HSTORE))
+ with engine.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "q"})
+ with engine.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "q"})
+ with engine2.begin() as conn:
+ eq_(conn.scalar(s), {"a": "b"})
+ with engine.begin() as conn:
+ eq_(conn.scalar(s), {"key": "value", "x": "q"})
+
+ def test_get_dialect(self):
+ u = url.URL.create("postgresql://")
+ d = psycopg_dialect.PGDialect_psycopg.get_dialect_cls(u)
+ is_(d, psycopg_dialect.PGDialect_psycopg)
+ d = psycopg_dialect.PGDialect_psycopg.get_async_dialect_cls(u)
+ is_(d, psycopg_dialect.PGDialectAsync_psycopg)
+ d = psycopg_dialect.PGDialectAsync_psycopg.get_dialect_cls(u)
+ is_(d, psycopg_dialect.PGDialectAsync_psycopg)
+ d = psycopg_dialect.PGDialectAsync_psycopg.get_dialect_cls(u)
+ is_(d, psycopg_dialect.PGDialectAsync_psycopg)
+
+ def test_async_version(self):
+ e = create_engine("postgresql+psycopg_async://")
+ is_true(isinstance(e.dialect, psycopg_dialect.PGDialectAsync_psycopg))