From d1e31ab1582e2d9275c70a89b72efc2a8651df3f Mon Sep 17 00:00:00 2001 From: Roman Podoliaka Date: Fri, 4 Nov 2016 00:31:05 +0200 Subject: Add support for server side cursors to mysqldb and pymysql This allows to skip buffering of the results on the client side, e.g. the following snippet: table = sa.Table( 'testtbl', sa.MetaData(), sa.Column('id', sa.Integer, primary_key=True), sa.Column('a', sa.Integer), sa.Column('b', sa.String(512)) ) table.create(eng, checkfirst=True) with eng.connect() as conn: result = conn.execute(table.select().limit(1)).fetchone() if result is None: for _ in range(1000): conn.execute( table.insert(), [{'a': random.randint(1, 100000), 'b': ''.join(random.choice(string.ascii_letters) for _ in range(100))} for _ in range(1000)] ) with eng.connect() as conn: for row in conn.execution_options(stream_results=True).execute(table.select()): pass now uses ~23 MB of memory instead of ~327 MB on CPython 3.5.2 and PyMySQL 0.7.9. psycopg2 implementation and execution options (stream_results, server_side_cursors) are reused. Change-Id: I4dc23ce3094f027bdff51b896b050361991c62e2 --- lib/sqlalchemy/engine/default.py | 46 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) (limited to 'lib/sqlalchemy/engine/default.py') diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3ee240383..719178f7e 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -27,6 +27,11 @@ AUTOCOMMIT_REGEXP = re.compile( r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)', re.I | re.UNICODE) +# When we're handed literal SQL, ensure it's a SELECT query +SERVER_SIDE_CURSOR_RE = re.compile( + r'\s*SELECT', + re.I | re.UNICODE) + class DefaultDialect(interfaces.Dialect): """Default implementation of Dialect""" @@ -108,6 +113,8 @@ class DefaultDialect(interfaces.Dialect): supports_empty_insert = True supports_multivalues_insert = False + supports_server_side_cursors = False + server_version_info = None construct_arguments = None @@ -780,8 +787,40 @@ class DefaultExecutionContext(interfaces.ExecutionContext): def should_autocommit_text(self, statement): return AUTOCOMMIT_REGEXP.match(statement) + def _use_server_side_cursor(self): + if not self.dialect.supports_server_side_cursors: + return False + + if self.dialect.server_side_cursors: + use_server_side = \ + self.execution_options.get('stream_results', True) and ( + (self.compiled and isinstance(self.compiled.statement, + expression.Selectable) + or + ( + (not self.compiled or + isinstance(self.compiled.statement, + expression.TextClause)) + and self.statement and SERVER_SIDE_CURSOR_RE.match( + self.statement)) + ) + ) + else: + use_server_side = \ + self.execution_options.get('stream_results', False) + + return use_server_side + def create_cursor(self): - return self._dbapi_connection.cursor() + if self._use_server_side_cursor(): + self._is_server_side = True + return self.create_server_side_cursor() + else: + self._is_server_side = False + return self._dbapi_connection.cursor() + + def create_server_side_cursor(self): + raise NotImplementedError() def pre_exec(self): pass @@ -831,7 +870,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext): pass def get_result_proxy(self): - return result.ResultProxy(self) + if self._is_server_side: + return result.BufferedRowResultProxy(self) + else: + return result.ResultProxy(self) @property def rowcount(self): -- cgit v1.2.1