summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql/asyncpg.py
diff options
context:
space:
mode:
authorPavel Sirotkin <pav.pnz@gmail.com>2023-04-20 13:40:04 -0400
committerFederico Caselli <cfederico87@gmail.com>2023-04-21 19:47:26 +0200
commit244c29768254d12ff18bb342b154a009080345d6 (patch)
tree3bc86916e3a12b3dac7d122cd9c4bd1210d5753f /lib/sqlalchemy/dialects/postgresql/asyncpg.py
parentfc2bcead435a9bf0a2de8e9b15a1bd835f9d7fe4 (diff)
downloadsqlalchemy-244c29768254d12ff18bb342b154a009080345d6.tar.gz
Add name_func optional attribute for asyncpg adapter
I faced an issue related to pg bouncer and prepared statement cache flow in asyncpg dialect. Regarding this discussion https://github.com/sqlalchemy/sqlalchemy/issues/6467 I prepared PR to support an optional parameter `name` in prepared statement which is allowed, since 0.25.0 version in `asyncpg` https://github.com/MagicStack/asyncpg/pull/846 **UPD:** the issue with proposal: https://github.com/sqlalchemy/sqlalchemy/issues/9608 ### Description Added optional parameter `name_func` to `AsyncAdapt_asyncpg_connection` class which will call on the `self._connection.prepare()` function and populate a unique name. so in general instead this ```python from uuid import uuid4 from asyncpg import Connection class CConnection(Connection): def _get_unique_id(self, prefix: str) -> str: return f'__asyncpg_{prefix}_{uuid4()}__' engine = create_async_engine(..., connect_args={ 'connection_class': CConnection, }, ) ``` would be enough ```python from uuid import uuid4 engine = create_async_engine(..., connect_args={ 'name_func': lambda: f'__asyncpg_{uuid4()}__', }, ) ``` ### Checklist <!-- go over following points. check them with an `x` if they do apply, (they turn into clickable checkboxes once the PR is submitted, so no need to do everything at once) --> This pull request is: - [ ] A documentation / typographical error fix - Good to go, no issue or tests are needed - [ ] A short code fix - please include the issue number, and create an issue if none exists, which must include a complete example of the issue. one line code fixes without an issue and demonstration will not be accepted. - Please include: `Fixes: #<issue number>` in the commit message - please include tests. one line code fixes without tests will not be accepted. - [x] A new feature implementation - please include the issue number, and create an issue if none exists, which must include a complete example of how the feature would look. - Please include: `Fixes: #<issue number>` in the commit message - please include tests. **Have a nice day!** Fixes: #9608 Closes: #9607 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/9607 Pull-request-sha: b4bc8d3e57ab095a26112830ad4bea36083454e3 Change-Id: Icd753366cba166b8a60d1c8566377ec8335cd828
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/asyncpg.py')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py70
1 files changed, 67 insertions, 3 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 2acc5fea3..f0b4562d2 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -98,6 +98,44 @@ To disable the prepared statement cache, use a value of zero::
stale, nor can it retry the statement as the PostgreSQL transaction is
invalidated when these errors occur.
+.. _asyncpg_prepared_statement_name:
+
+Prepared Statement Name
+-----------------------
+
+By default, asyncpg enumerates prepared statements in numeric order, which
+can lead to errors if a name has already been taken for another prepared
+statement. This issue can arise if your application uses database proxies
+such as PgBouncer to handle connections. One possible workaround is to
+use dynamic prepared statement names, which asyncpg now supports through
+an optional name value for the statement name. This allows you to
+generate your own unique names that won't conflict with existing ones.
+To achieve this, you can provide a function that will be called every time
+a prepared statement is prepared::
+
+ from uuid import uuid4
+
+ engine = create_async_engine(
+ "postgresql+asyncpg://user:pass@hostname/dbname",
+ poolclass=NullPool,
+ connect_args={
+ 'prepared_statement_name_func': lambda: f'__asyncpg_{uuid4()}__',
+ },
+ )
+
+.. seealso::
+
+ https://github.com/MagicStack/asyncpg/issues/837
+
+ https://github.com/sqlalchemy/sqlalchemy/issues/6467
+
+.. warning:: To prevent a buildup of useless prepared statements in
+ your application, it's important to use the NullPool poolclass and
+ PgBouncer with a configured `DISCARD https://www.postgresql.org/docs/current/sql-discard.html`_
+ setup. The DISCARD command is used to release resources held by the db connection,
+ including prepared statements. Without proper setup, prepared statements can
+ accumulate quickly and cause performance issues.
+
Disabling the PostgreSQL JIT to improve ENUM datatype handling
---------------------------------------------------------------
@@ -642,13 +680,20 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
"_transaction",
"_started",
"_prepared_statement_cache",
+ "_prepared_statement_name_func",
"_invalidate_schema_cache_asof",
"_execute_mutex",
)
await_ = staticmethod(await_only)
- def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
+ def __init__(
+ self,
+ dbapi,
+ connection,
+ prepared_statement_cache_size=100,
+ prepared_statement_name_func=None,
+ ):
self.dbapi = dbapi
self._connection = connection
self.isolation_level = self._isolation_setting = "read_committed"
@@ -666,6 +711,11 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
else:
self._prepared_statement_cache = None
+ if prepared_statement_name_func:
+ self._prepared_statement_name_func = prepared_statement_name_func
+ else:
+ self._prepared_statement_name_func = self._default_name_func
+
async def _check_type_cache_invalidation(self, invalidate_timestamp):
if invalidate_timestamp > self._invalidate_schema_cache_asof:
await self._connection.reload_schema_state()
@@ -676,7 +726,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
cache = self._prepared_statement_cache
if cache is None:
- prepared_stmt = await self._connection.prepare(operation)
+ prepared_stmt = await self._connection.prepare(
+ operation, name=self._prepared_statement_name_func()
+ )
attributes = prepared_stmt.get_attributes()
return prepared_stmt, attributes
@@ -692,7 +744,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
if cached_timestamp > invalidate_timestamp:
return prepared_stmt, attributes
- prepared_stmt = await self._connection.prepare(operation)
+ prepared_stmt = await self._connection.prepare(
+ operation, name=self._prepared_statement_name_func()
+ )
attributes = prepared_stmt.get_attributes()
cache[operation] = (prepared_stmt, attributes, time.time())
@@ -792,6 +846,10 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
def terminate(self):
self._connection.terminate()
+ @staticmethod
+ def _default_name_func():
+ return None
+
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
__slots__ = ()
@@ -809,17 +867,23 @@ class AsyncAdapt_asyncpg_dbapi:
prepared_statement_cache_size = kw.pop(
"prepared_statement_cache_size", 100
)
+ prepared_statement_name_func = kw.pop(
+ "prepared_statement_name_func", None
+ )
+
if util.asbool(async_fallback):
return AsyncAdaptFallback_asyncpg_connection(
self,
await_fallback(self.asyncpg.connect(*arg, **kw)),
prepared_statement_cache_size=prepared_statement_cache_size,
+ prepared_statement_name_func=prepared_statement_name_func,
)
else:
return AsyncAdapt_asyncpg_connection(
self,
await_only(self.asyncpg.connect(*arg, **kw)),
prepared_statement_cache_size=prepared_statement_cache_size,
+ prepared_statement_name_func=prepared_statement_name_func,
)
class Error(Exception):