summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-04-27 12:58:12 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-05-25 13:56:37 -0400
commit6930dfc032c3f9f474e71ab4e021c0ef8384930e (patch)
tree34b919a3c34edaffda1750f161a629fc5b9a8020 /lib/sqlalchemy/engine
parentdce8c7a125cb99fad62c76cd145752d5afefae36 (diff)
downloadsqlalchemy-6930dfc032c3f9f474e71ab4e021c0ef8384930e.tar.gz
Convert execution to move through Session
This patch replaces the ORM execution flow with a single pathway through Session.execute() for all queries, including Core and ORM. Currently included is full support for ORM Query, Query.from_statement(), select(), as well as the baked query and horizontal shard systems. Initial changes have also been made to the dogpile caching example, which like baked query makes use of a new ORM-specific execution hook that replaces the use of both QueryEvents.before_compile() as well as Query._execute_and_instances() as the central ORM interception hooks. select() and Query() constructs alike can be passed to Session.execute() where they will return ORM results in a Results object. This API is currently used internally by Query. Full support for Session.execute()->results to behave in a fully 2.0 fashion will be in later changesets. bulk update/delete with ORM support will also be delivered via the update() and delete() constructs, however these have not yet been adapted to the new system and may follow in a subsequent update. Performance is also beginning to lag as of this commit and some previous ones. It is hoped that a few central functions such as the coercions functions can be rewritten in C to re-gain performance. Additionally, query caching is now available and some subsequent patches will attempt to cache more of the per-execution work from the ORM layer, e.g. column getters and adapters. This patch also contains initial "turn on" of the caching system enginewide via the query_cache_size parameter to create_engine(). Still defaulting at zero for "no caching". The caching system still needs adjustments in order to gain adequate performance. Change-Id: I047a7ebb26aa85dc01f6789fac2bff561dcd555d
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r--lib/sqlalchemy/engine/base.py62
-rw-r--r--lib/sqlalchemy/engine/create.py17
-rw-r--r--lib/sqlalchemy/engine/cursor.py10
-rw-r--r--lib/sqlalchemy/engine/default.py49
-rw-r--r--lib/sqlalchemy/engine/result.py93
5 files changed, 172 insertions, 59 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index ee02899f6..0193ea47c 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -225,14 +225,10 @@ class Connection(Connectable):
A dictionary where :class:`.Compiled` objects
will be cached when the :class:`_engine.Connection`
compiles a clause
- expression into a :class:`.Compiled` object.
- It is the user's responsibility to
- manage the size of this dictionary, which will have keys
- corresponding to the dialect, clause element, the column
- names within the VALUES or SET clause of an INSERT or UPDATE,
- as well as the "batch" mode for an INSERT or UPDATE statement.
- The format of this dictionary is not guaranteed to stay the
- same in future releases.
+ expression into a :class:`.Compiled` object. This dictionary will
+ supersede the statement cache that may be configured on the
+ :class:`_engine.Engine` itself. If set to None, caching
+ is disabled, even if the engine has a configured cache size.
Note that the ORM makes use of its own "compiled" caches for
some operations, including flush operations. The caching
@@ -1159,13 +1155,17 @@ class Connection(Connectable):
schema_translate_map = exec_opts.get("schema_translate_map", None)
- if "compiled_cache" in exec_opts:
+ compiled_cache = exec_opts.get(
+ "compiled_cache", self.dialect._compiled_cache
+ )
+
+ if compiled_cache is not None:
elem_cache_key = elem._generate_cache_key()
else:
elem_cache_key = None
if elem_cache_key:
- cache_key, extracted_params = elem_cache_key
+ cache_key, extracted_params, _ = elem_cache_key
key = (
dialect,
cache_key,
@@ -1173,8 +1173,7 @@ class Connection(Connectable):
bool(schema_translate_map),
len(distilled_params) > 1,
)
- cache = exec_opts["compiled_cache"]
- compiled_sql = cache.get(key)
+ compiled_sql = compiled_cache.get(key)
if compiled_sql is None:
compiled_sql = elem.compile(
@@ -1185,12 +1184,8 @@ class Connection(Connectable):
schema_translate_map=schema_translate_map,
linting=self.dialect.compiler_linting
| compiler.WARN_LINTING,
- compile_state_factories=exec_opts.get(
- "compile_state_factories", None
- ),
)
- cache[key] = compiled_sql
-
+ compiled_cache[key] = compiled_sql
else:
extracted_params = None
compiled_sql = elem.compile(
@@ -1199,9 +1194,6 @@ class Connection(Connectable):
inline=len(distilled_params) > 1,
schema_translate_map=schema_translate_map,
linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
- compile_state_factories=exec_opts.get(
- "compile_state_factories", None
- ),
)
ret = self._execute_context(
@@ -1430,18 +1422,35 @@ class Connection(Connectable):
)
if self._echo:
+
self.engine.logger.info(statement)
+
+ # stats = context._get_cache_stats()
+
if not self.engine.hide_parameters:
+ # TODO: I love the stats but a ton of tests that are hardcoded.
+ # to certain log output are failing.
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
+ # self.engine.logger.info(
+ # "[%s] %r",
+ # stats,
+ # sql_util._repr_params(
+ # parameters, batches=10, ismulti=context.executemany
+ # ),
+ # )
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
+ # self.engine.logger.info(
+ # "[%s] [SQL parameters hidden due to hide_parameters=True]"
+ # % (stats,)
+ # )
evt_handled = False
try:
@@ -1502,19 +1511,14 @@ class Connection(Connectable):
# for "connectionless" execution, we have to close this
# Connection after the statement is complete.
- if branched.should_close_with_result:
+ # legacy stuff.
+ if branched.should_close_with_result and context._soft_closed:
assert not self._is_future
assert not context._is_future_result
# CursorResult already exhausted rows / has no rows.
- # close us now. note this is where we call .close()
- # on the "branched" connection if we're doing that.
- if result._soft_closed:
- branched.close()
- else:
- # CursorResult will close this Connection when no more
- # rows to fetch.
- result._autoclose_connection = True
+ # close us now
+ branched.close()
except BaseException as e:
self._handle_dbapi_exception(
e, statement, parameters, cursor, context
diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py
index e683b6297..4c912349e 100644
--- a/lib/sqlalchemy/engine/create.py
+++ b/lib/sqlalchemy/engine/create.py
@@ -435,6 +435,23 @@ def create_engine(url, **kwargs):
.. versionadded:: 1.2.3
+ :param query_cache_size: size of the cache used to cache the SQL string
+ form of queries. Defaults to zero, which disables caching.
+
+ Caching is accomplished on a per-statement basis by generating a
+ cache key that represents the statement's structure, then generating
+ string SQL for the current dialect only if that key is not present
+ in the cache. All statements support caching, however some features
+ such as an INSERT with a large set of parameters will intentionally
+ bypass the cache. SQL logging will indicate statistics for each
+ statement whether or not it were pull from the cache.
+
+ .. seealso::
+
+ ``engine_caching`` - TODO: this will be an upcoming section describing
+ the SQL caching system.
+
+ .. versionadded:: 1.4
""" # noqa
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index 8d1a1bb57..fdbf826ed 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -57,6 +57,9 @@ class CursorResultMetaData(ResultMetaData):
returns_rows = True
+ def _has_key(self, key):
+ return key in self._keymap
+
def _for_freeze(self):
return SimpleResultMetaData(
self._keys,
@@ -1203,6 +1206,7 @@ class BaseCursorResult(object):
out_parameters = None
_metadata = None
+ _metadata_from_cache = False
_soft_closed = False
closed = False
@@ -1213,7 +1217,6 @@ class BaseCursorResult(object):
obj = CursorResult(context)
else:
obj = LegacyCursorResult(context)
-
return obj
def __init__(self, context):
@@ -1247,8 +1250,9 @@ class BaseCursorResult(object):
def _init_metadata(self, context, cursor_description):
if context.compiled:
if context.compiled._cached_metadata:
- cached_md = context.compiled._cached_metadata
- self._metadata = cached_md._adapt_to_context(context)
+ cached_md = self.context.compiled._cached_metadata
+ self._metadata = cached_md
+ self._metadata_from_cache = True
else:
self._metadata = (
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index e30daaeb8..b5cb2a1b2 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -16,6 +16,7 @@ as the base class for their own corresponding classes.
import codecs
import random
import re
+import time
import weakref
from . import cursor as _cursor
@@ -226,6 +227,7 @@ class DefaultDialect(interfaces.Dialect):
supports_native_boolean=None,
max_identifier_length=None,
label_length=None,
+ query_cache_size=0,
# int() is because the @deprecated_params decorator cannot accommodate
# the direct reference to the "NO_LINTING" object
compiler_linting=int(compiler.NO_LINTING),
@@ -257,6 +259,10 @@ class DefaultDialect(interfaces.Dialect):
if supports_native_boolean is not None:
self.supports_native_boolean = supports_native_boolean
self.case_sensitive = case_sensitive
+ if query_cache_size != 0:
+ self._compiled_cache = util.LRUCache(query_cache_size)
+ else:
+ self._compiled_cache = None
self._user_defined_max_identifier_length = max_identifier_length
if self._user_defined_max_identifier_length:
@@ -702,11 +708,17 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
result_column_struct = None
returned_defaults = None
execution_options = util.immutabledict()
+
+ cache_stats = None
+ invoked_statement = None
+
_is_implicit_returning = False
_is_explicit_returning = False
_is_future_result = False
_is_server_side = False
+ _soft_closed = False
+
# a hook for SQLite's translation of
# result column names
# NOTE: pyhive is using this hook, can't remove it :(
@@ -1011,6 +1023,16 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
self.cursor = self.create_cursor()
return self
+ def _get_cache_stats(self):
+ if self.compiled is None:
+ return "raw SQL"
+
+ now = time.time()
+ if self.compiled.cache_key is None:
+ return "gen %.5fs" % (now - self.compiled._gen_time,)
+ else:
+ return "cached %.5fs" % (now - self.compiled._gen_time,)
+
@util.memoized_property
def engine(self):
return self.root_connection.engine
@@ -1234,6 +1256,33 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
):
self._setup_out_parameters(result)
+ if not self._is_future_result:
+ conn = self.root_connection
+ assert not conn._is_future
+
+ if not result._soft_closed and conn.should_close_with_result:
+ result._autoclose_connection = True
+
+ self._soft_closed = result._soft_closed
+
+ # result rewrite/ adapt step. two translations can occur here.
+ # one is if we are invoked against a cached statement, we want
+ # to rewrite the ResultMetaData to reflect the column objects
+ # that are in our current selectable, not the cached one. the
+ # other is, the CompileState can return an alternative Result
+ # object. Finally, CompileState might want to tell us to not
+ # actually do the ResultMetaData adapt step if it in fact has
+ # changed the selected columns in any case.
+ compiled = self.compiled
+ if compiled:
+ adapt_metadata = (
+ result._metadata_from_cache
+ and not compiled._rewrites_selected_columns
+ )
+
+ if adapt_metadata:
+ result._metadata = result._metadata._adapt_to_context(self)
+
return result
def _setup_out_parameters(self, result):
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index 4e6b22820..0ee80ede4 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -56,6 +56,9 @@ class ResultMetaData(object):
def keys(self):
return RMKeyView(self)
+ def _has_key(self, key):
+ raise NotImplementedError()
+
def _for_freeze(self):
raise NotImplementedError()
@@ -171,6 +174,9 @@ class SimpleResultMetaData(ResultMetaData):
self._processors = _processors
+ def _has_key(self, key):
+ return key in self._keymap
+
def _for_freeze(self):
unique_filters = self._unique_filters
if unique_filters and self._tuplefilter:
@@ -287,6 +293,8 @@ class Result(InPlaceGenerative):
_no_scalar_onerow = False
_yield_per = None
+ _attributes = util.immutabledict()
+
def __init__(self, cursor_metadata):
self._metadata = cursor_metadata
@@ -548,10 +556,21 @@ class Result(InPlaceGenerative):
self._generate_rows = True
def _row_getter(self):
- if self._source_supports_scalars and not self._generate_rows:
- return None
+ if self._source_supports_scalars:
+ if not self._generate_rows:
+ return None
+ else:
+ _proc = self._process_row
+
+ def process_row(
+ metadata, processors, keymap, key_style, scalar_obj
+ ):
+ return _proc(
+ metadata, processors, keymap, key_style, (scalar_obj,)
+ )
- process_row = self._process_row
+ else:
+ process_row = self._process_row
key_style = self._process_row._default_key_style
metadata = self._metadata
@@ -771,16 +790,15 @@ class Result(InPlaceGenerative):
uniques, strategy = self._unique_strategy
def filterrows(make_row, rows, strategy, uniques):
+ if make_row:
+ rows = [make_row(row) for row in rows]
+
if strategy:
made_rows = (
- (made_row, strategy(made_row))
- for made_row in [make_row(row) for row in rows]
+ (made_row, strategy(made_row)) for made_row in rows
)
else:
- made_rows = (
- (made_row, made_row)
- for made_row in [make_row(row) for row in rows]
- )
+ made_rows = ((made_row, made_row) for made_row in rows)
return [
made_row
for made_row, sig_row in made_rows
@@ -831,7 +849,8 @@ class Result(InPlaceGenerative):
num = self._yield_per
rows = self._fetchmany_impl(num)
- rows = [make_row(row) for row in rows]
+ if make_row:
+ rows = [make_row(row) for row in rows]
if post_creational_filter:
rows = [post_creational_filter(row) for row in rows]
return rows
@@ -1114,24 +1133,42 @@ class FrozenResult(object):
def __init__(self, result):
self.metadata = result._metadata._for_freeze()
self._post_creational_filter = result._post_creational_filter
- self._source_supports_scalars = result._source_supports_scalars
self._generate_rows = result._generate_rows
+ self._source_supports_scalars = result._source_supports_scalars
+ self._attributes = result._attributes
result._post_creational_filter = None
- self.data = result.fetchall()
+ if self._source_supports_scalars:
+ self.data = list(result._raw_row_iterator())
+ else:
+ self.data = result.fetchall()
+
+ def rewrite_rows(self):
+ if self._source_supports_scalars:
+ return [[elem] for elem in self.data]
+ else:
+ return [list(row) for row in self.data]
- def with_data(self, data):
+ def with_new_rows(self, tuple_data):
fr = FrozenResult.__new__(FrozenResult)
fr.metadata = self.metadata
fr._post_creational_filter = self._post_creational_filter
- fr.data = data
+ fr._generate_rows = self._generate_rows
+ fr._attributes = self._attributes
+ fr._source_supports_scalars = self._source_supports_scalars
+
+ if self._source_supports_scalars:
+ fr.data = [d[0] for d in tuple_data]
+ else:
+ fr.data = tuple_data
return fr
def __call__(self):
result = IteratorResult(self.metadata, iter(self.data))
result._post_creational_filter = self._post_creational_filter
- result._source_supports_scalars = self._source_supports_scalars
result._generate_rows = self._generate_rows
+ result._attributes = self._attributes
+ result._source_supports_scalars = self._source_supports_scalars
return result
@@ -1143,9 +1180,10 @@ class IteratorResult(Result):
"""
- def __init__(self, cursor_metadata, iterator):
+ def __init__(self, cursor_metadata, iterator, raw=None):
self._metadata = cursor_metadata
self.iterator = iterator
+ self.raw = raw
def _soft_close(self, **kw):
self.iterator = iter([])
@@ -1189,28 +1227,23 @@ class ChunkedIteratorResult(IteratorResult):
"""
- def __init__(self, cursor_metadata, chunks, source_supports_scalars=False):
+ def __init__(
+ self, cursor_metadata, chunks, source_supports_scalars=False, raw=None
+ ):
self._metadata = cursor_metadata
self.chunks = chunks
self._source_supports_scalars = source_supports_scalars
-
- self.iterator = itertools.chain.from_iterable(
- self.chunks(None, self._generate_rows)
- )
+ self.raw = raw
+ self.iterator = itertools.chain.from_iterable(self.chunks(None))
def _column_slices(self, indexes):
result = super(ChunkedIteratorResult, self)._column_slices(indexes)
- self.iterator = itertools.chain.from_iterable(
- self.chunks(self._yield_per, self._generate_rows)
- )
return result
@_generative
def yield_per(self, num):
self._yield_per = num
- self.iterator = itertools.chain.from_iterable(
- self.chunks(num, self._generate_rows)
- )
+ self.iterator = itertools.chain.from_iterable(self.chunks(num))
class MergedResult(IteratorResult):
@@ -1238,8 +1271,14 @@ class MergedResult(IteratorResult):
self._post_creational_filter = results[0]._post_creational_filter
self._no_scalar_onerow = results[0]._no_scalar_onerow
self._yield_per = results[0]._yield_per
+
+ # going to try someting w/ this in next rev
self._source_supports_scalars = results[0]._source_supports_scalars
+
self._generate_rows = results[0]._generate_rows
+ self._attributes = self._attributes.merge_with(
+ *[r._attributes for r in results]
+ )
def close(self):
self._soft_close(hard=True)