diff options
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 46 |
2 files changed, 45 insertions, 3 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 1d23c66b3..f071abaa1 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -295,7 +295,7 @@ class Connection(Connectable): Indicate to the dialect that results should be "streamed" and not pre-buffered, if possible. This is a limitation of many DBAPIs. The flag is currently understood only by the - psycopg2 dialect. + psycopg2, mysqldb and pymysql dialects. :param schema_translate_map: Available on: Connection, Engine. A dictionary mapping schema names to schema names, that will be diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3ee240383..719178f7e 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -27,6 +27,11 @@ AUTOCOMMIT_REGEXP = re.compile( r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)', re.I | re.UNICODE) +# When we're handed literal SQL, ensure it's a SELECT query +SERVER_SIDE_CURSOR_RE = re.compile( + r'\s*SELECT', + re.I | re.UNICODE) + class DefaultDialect(interfaces.Dialect): """Default implementation of Dialect""" @@ -108,6 +113,8 @@ class DefaultDialect(interfaces.Dialect): supports_empty_insert = True supports_multivalues_insert = False + supports_server_side_cursors = False + server_version_info = None construct_arguments = None @@ -780,8 +787,40 @@ class DefaultExecutionContext(interfaces.ExecutionContext): def should_autocommit_text(self, statement): return AUTOCOMMIT_REGEXP.match(statement) + def _use_server_side_cursor(self): + if not self.dialect.supports_server_side_cursors: + return False + + if self.dialect.server_side_cursors: + use_server_side = \ + self.execution_options.get('stream_results', True) and ( + (self.compiled and isinstance(self.compiled.statement, + expression.Selectable) + or + ( + (not self.compiled or + isinstance(self.compiled.statement, + expression.TextClause)) + and self.statement and SERVER_SIDE_CURSOR_RE.match( + self.statement)) + ) + ) + else: + use_server_side = \ + self.execution_options.get('stream_results', False) + + return use_server_side + def create_cursor(self): - return self._dbapi_connection.cursor() + if self._use_server_side_cursor(): + self._is_server_side = True + return self.create_server_side_cursor() + else: + self._is_server_side = False + return self._dbapi_connection.cursor() + + def create_server_side_cursor(self): + raise NotImplementedError() def pre_exec(self): pass @@ -831,7 +870,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext): pass def get_result_proxy(self): - return result.ResultProxy(self) + if self._is_server_side: + return result.BufferedRowResultProxy(self) + else: + return result.ResultProxy(self) @property def rowcount(self): |
