summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql/asyncpg.py
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2023-04-21 18:57:01 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2023-04-21 18:57:01 +0000
commit69ba6e3cf771dce93b18350ab6b4a4ab79604b40 (patch)
tree897cd37c92d52f118c3ce05eefc511309cb5d751 /lib/sqlalchemy/dialects/postgresql/asyncpg.py
parentc84b3bf198c75ad4f42b0f83d482e480200e6d16 (diff)
parent244c29768254d12ff18bb342b154a009080345d6 (diff)
downloadsqlalchemy-69ba6e3cf771dce93b18350ab6b4a4ab79604b40.tar.gz
Merge "Add name_func optional attribute for asyncpg adapter" into main
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/asyncpg.py')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py70
1 files changed, 67 insertions, 3 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index d1a52afd6..0c7f17ce9 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -98,6 +98,44 @@ To disable the prepared statement cache, use a value of zero::
stale, nor can it retry the statement as the PostgreSQL transaction is
invalidated when these errors occur.
+.. _asyncpg_prepared_statement_name:
+
+Prepared Statement Name
+-----------------------
+
+By default, asyncpg enumerates prepared statements in numeric order, which
+can lead to errors if a name has already been taken for another prepared
+statement. This issue can arise if your application uses database proxies
+such as PgBouncer to handle connections. One possible workaround is to
+use dynamic prepared statement names, which asyncpg now supports through
+an optional name value for the statement name. This allows you to
+generate your own unique names that won't conflict with existing ones.
+To achieve this, you can provide a function that will be called every time
+a prepared statement is prepared::
+
+ from uuid import uuid4
+
+ engine = create_async_engine(
+ "postgresql+asyncpg://user:pass@hostname/dbname",
+ poolclass=NullPool,
+ connect_args={
+ 'prepared_statement_name_func': lambda: f'__asyncpg_{uuid4()}__',
+ },
+ )
+
+.. seealso::
+
+ https://github.com/MagicStack/asyncpg/issues/837
+
+ https://github.com/sqlalchemy/sqlalchemy/issues/6467
+
+.. warning:: To prevent a buildup of useless prepared statements in
+ your application, it's important to use the NullPool poolclass and
+ PgBouncer with a configured `DISCARD https://www.postgresql.org/docs/current/sql-discard.html`_
+ setup. The DISCARD command is used to release resources held by the db connection,
+ including prepared statements. Without proper setup, prepared statements can
+ accumulate quickly and cause performance issues.
+
Disabling the PostgreSQL JIT to improve ENUM datatype handling
---------------------------------------------------------------
@@ -647,13 +685,20 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
"_transaction",
"_started",
"_prepared_statement_cache",
+ "_prepared_statement_name_func",
"_invalidate_schema_cache_asof",
"_execute_mutex",
)
await_ = staticmethod(await_only)
- def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
+ def __init__(
+ self,
+ dbapi,
+ connection,
+ prepared_statement_cache_size=100,
+ prepared_statement_name_func=None,
+ ):
self.dbapi = dbapi
self._connection = connection
self.isolation_level = self._isolation_setting = "read_committed"
@@ -671,6 +716,11 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
else:
self._prepared_statement_cache = None
+ if prepared_statement_name_func:
+ self._prepared_statement_name_func = prepared_statement_name_func
+ else:
+ self._prepared_statement_name_func = self._default_name_func
+
async def _check_type_cache_invalidation(self, invalidate_timestamp):
if invalidate_timestamp > self._invalidate_schema_cache_asof:
await self._connection.reload_schema_state()
@@ -681,7 +731,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
cache = self._prepared_statement_cache
if cache is None:
- prepared_stmt = await self._connection.prepare(operation)
+ prepared_stmt = await self._connection.prepare(
+ operation, name=self._prepared_statement_name_func()
+ )
attributes = prepared_stmt.get_attributes()
return prepared_stmt, attributes
@@ -697,7 +749,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
if cached_timestamp > invalidate_timestamp:
return prepared_stmt, attributes
- prepared_stmt = await self._connection.prepare(operation)
+ prepared_stmt = await self._connection.prepare(
+ operation, name=self._prepared_statement_name_func()
+ )
attributes = prepared_stmt.get_attributes()
cache[operation] = (prepared_stmt, attributes, time.time())
@@ -797,6 +851,10 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
def terminate(self):
self._connection.terminate()
+ @staticmethod
+ def _default_name_func():
+ return None
+
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
__slots__ = ()
@@ -814,17 +872,23 @@ class AsyncAdapt_asyncpg_dbapi:
prepared_statement_cache_size = kw.pop(
"prepared_statement_cache_size", 100
)
+ prepared_statement_name_func = kw.pop(
+ "prepared_statement_name_func", None
+ )
+
if util.asbool(async_fallback):
return AsyncAdaptFallback_asyncpg_connection(
self,
await_fallback(self.asyncpg.connect(*arg, **kw)),
prepared_statement_cache_size=prepared_statement_cache_size,
+ prepared_statement_name_func=prepared_statement_name_func,
)
else:
return AsyncAdapt_asyncpg_connection(
self,
await_only(self.asyncpg.connect(*arg, **kw)),
prepared_statement_cache_size=prepared_statement_cache_size,
+ prepared_statement_name_func=prepared_statement_name_func,
)
class Error(Exception):