summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r--lib/sqlalchemy/engine/base.py2
-rw-r--r--lib/sqlalchemy/engine/default.py46
2 files changed, 45 insertions, 3 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 1d23c66b3..f071abaa1 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -295,7 +295,7 @@ class Connection(Connectable):
Indicate to the dialect that results should be
"streamed" and not pre-buffered, if possible. This is a limitation
of many DBAPIs. The flag is currently understood only by the
- psycopg2 dialect.
+ psycopg2, mysqldb and pymysql dialects.
:param schema_translate_map: Available on: Connection, Engine.
A dictionary mapping schema names to schema names, that will be
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 3ee240383..719178f7e 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -27,6 +27,11 @@ AUTOCOMMIT_REGEXP = re.compile(
r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)',
re.I | re.UNICODE)
+# When we're handed literal SQL, ensure it's a SELECT query
+SERVER_SIDE_CURSOR_RE = re.compile(
+ r'\s*SELECT',
+ re.I | re.UNICODE)
+
class DefaultDialect(interfaces.Dialect):
"""Default implementation of Dialect"""
@@ -108,6 +113,8 @@ class DefaultDialect(interfaces.Dialect):
supports_empty_insert = True
supports_multivalues_insert = False
+ supports_server_side_cursors = False
+
server_version_info = None
construct_arguments = None
@@ -780,8 +787,40 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
def should_autocommit_text(self, statement):
return AUTOCOMMIT_REGEXP.match(statement)
+ def _use_server_side_cursor(self):
+ if not self.dialect.supports_server_side_cursors:
+ return False
+
+ if self.dialect.server_side_cursors:
+ use_server_side = \
+ self.execution_options.get('stream_results', True) and (
+ (self.compiled and isinstance(self.compiled.statement,
+ expression.Selectable)
+ or
+ (
+ (not self.compiled or
+ isinstance(self.compiled.statement,
+ expression.TextClause))
+ and self.statement and SERVER_SIDE_CURSOR_RE.match(
+ self.statement))
+ )
+ )
+ else:
+ use_server_side = \
+ self.execution_options.get('stream_results', False)
+
+ return use_server_side
+
def create_cursor(self):
- return self._dbapi_connection.cursor()
+ if self._use_server_side_cursor():
+ self._is_server_side = True
+ return self.create_server_side_cursor()
+ else:
+ self._is_server_side = False
+ return self._dbapi_connection.cursor()
+
+ def create_server_side_cursor(self):
+ raise NotImplementedError()
def pre_exec(self):
pass
@@ -831,7 +870,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
pass
def get_result_proxy(self):
- return result.ResultProxy(self)
+ if self._is_server_side:
+ return result.BufferedRowResultProxy(self)
+ else:
+ return result.ResultProxy(self)
@property
def rowcount(self):