diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-27 12:58:12 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-25 13:56:37 -0400 |
| commit | 6930dfc032c3f9f474e71ab4e021c0ef8384930e (patch) | |
| tree | 34b919a3c34edaffda1750f161a629fc5b9a8020 /lib/sqlalchemy/engine | |
| parent | dce8c7a125cb99fad62c76cd145752d5afefae36 (diff) | |
| download | sqlalchemy-6930dfc032c3f9f474e71ab4e021c0ef8384930e.tar.gz | |
Convert execution to move through Session
This patch replaces the ORM execution flow with a
single pathway through Session.execute() for all queries,
including Core and ORM.
Currently included is full support for ORM Query,
Query.from_statement(), select(), as well as the
baked query and horizontal shard systems. Initial
changes have also been made to the dogpile caching
example, which like baked query makes use of a
new ORM-specific execution hook that replaces the
use of both QueryEvents.before_compile() as well
as Query._execute_and_instances() as the central
ORM interception hooks.
select() and Query() constructs alike can be passed to
Session.execute() where they will return ORM
results in a Results object. This API is currently
used internally by Query. Full support for
Session.execute()->results to behave in a fully
2.0 fashion will be in later changesets.
bulk update/delete with ORM support will also
be delivered via the update() and delete()
constructs, however these have not yet been adapted
to the new system and may follow in a subsequent
update.
Performance is also beginning to lag as of this
commit and some previous ones. It is hoped that
a few central functions such as the coercions
functions can be rewritten in C to re-gain
performance. Additionally, query caching
is now available and some subsequent patches
will attempt to cache more of the per-execution
work from the ORM layer, e.g. column getters
and adapters.
This patch also contains initial "turn on" of the
caching system enginewide via the query_cache_size
parameter to create_engine(). Still defaulting at
zero for "no caching". The caching system still
needs adjustments in order to gain adequate performance.
Change-Id: I047a7ebb26aa85dc01f6789fac2bff561dcd555d
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 62 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/create.py | 17 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 10 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 49 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/result.py | 93 |
5 files changed, 172 insertions, 59 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index ee02899f6..0193ea47c 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -225,14 +225,10 @@ class Connection(Connectable): A dictionary where :class:`.Compiled` objects will be cached when the :class:`_engine.Connection` compiles a clause - expression into a :class:`.Compiled` object. - It is the user's responsibility to - manage the size of this dictionary, which will have keys - corresponding to the dialect, clause element, the column - names within the VALUES or SET clause of an INSERT or UPDATE, - as well as the "batch" mode for an INSERT or UPDATE statement. - The format of this dictionary is not guaranteed to stay the - same in future releases. + expression into a :class:`.Compiled` object. This dictionary will + supersede the statement cache that may be configured on the + :class:`_engine.Engine` itself. If set to None, caching + is disabled, even if the engine has a configured cache size. Note that the ORM makes use of its own "compiled" caches for some operations, including flush operations. The caching @@ -1159,13 +1155,17 @@ class Connection(Connectable): schema_translate_map = exec_opts.get("schema_translate_map", None) - if "compiled_cache" in exec_opts: + compiled_cache = exec_opts.get( + "compiled_cache", self.dialect._compiled_cache + ) + + if compiled_cache is not None: elem_cache_key = elem._generate_cache_key() else: elem_cache_key = None if elem_cache_key: - cache_key, extracted_params = elem_cache_key + cache_key, extracted_params, _ = elem_cache_key key = ( dialect, cache_key, @@ -1173,8 +1173,7 @@ class Connection(Connectable): bool(schema_translate_map), len(distilled_params) > 1, ) - cache = exec_opts["compiled_cache"] - compiled_sql = cache.get(key) + compiled_sql = compiled_cache.get(key) if compiled_sql is None: compiled_sql = elem.compile( @@ -1185,12 +1184,8 @@ class Connection(Connectable): schema_translate_map=schema_translate_map, linting=self.dialect.compiler_linting | compiler.WARN_LINTING, - compile_state_factories=exec_opts.get( - "compile_state_factories", None - ), ) - cache[key] = compiled_sql - + compiled_cache[key] = compiled_sql else: extracted_params = None compiled_sql = elem.compile( @@ -1199,9 +1194,6 @@ class Connection(Connectable): inline=len(distilled_params) > 1, schema_translate_map=schema_translate_map, linting=self.dialect.compiler_linting | compiler.WARN_LINTING, - compile_state_factories=exec_opts.get( - "compile_state_factories", None - ), ) ret = self._execute_context( @@ -1430,18 +1422,35 @@ class Connection(Connectable): ) if self._echo: + self.engine.logger.info(statement) + + # stats = context._get_cache_stats() + if not self.engine.hide_parameters: + # TODO: I love the stats but a ton of tests that are hardcoded. + # to certain log output are failing. self.engine.logger.info( "%r", sql_util._repr_params( parameters, batches=10, ismulti=context.executemany ), ) + # self.engine.logger.info( + # "[%s] %r", + # stats, + # sql_util._repr_params( + # parameters, batches=10, ismulti=context.executemany + # ), + # ) else: self.engine.logger.info( "[SQL parameters hidden due to hide_parameters=True]" ) + # self.engine.logger.info( + # "[%s] [SQL parameters hidden due to hide_parameters=True]" + # % (stats,) + # ) evt_handled = False try: @@ -1502,19 +1511,14 @@ class Connection(Connectable): # for "connectionless" execution, we have to close this # Connection after the statement is complete. - if branched.should_close_with_result: + # legacy stuff. + if branched.should_close_with_result and context._soft_closed: assert not self._is_future assert not context._is_future_result # CursorResult already exhausted rows / has no rows. - # close us now. note this is where we call .close() - # on the "branched" connection if we're doing that. - if result._soft_closed: - branched.close() - else: - # CursorResult will close this Connection when no more - # rows to fetch. - result._autoclose_connection = True + # close us now + branched.close() except BaseException as e: self._handle_dbapi_exception( e, statement, parameters, cursor, context diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index e683b6297..4c912349e 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -435,6 +435,23 @@ def create_engine(url, **kwargs): .. versionadded:: 1.2.3 + :param query_cache_size: size of the cache used to cache the SQL string + form of queries. Defaults to zero, which disables caching. + + Caching is accomplished on a per-statement basis by generating a + cache key that represents the statement's structure, then generating + string SQL for the current dialect only if that key is not present + in the cache. All statements support caching, however some features + such as an INSERT with a large set of parameters will intentionally + bypass the cache. SQL logging will indicate statistics for each + statement whether or not it were pull from the cache. + + .. seealso:: + + ``engine_caching`` - TODO: this will be an upcoming section describing + the SQL caching system. + + .. versionadded:: 1.4 """ # noqa diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index 8d1a1bb57..fdbf826ed 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -57,6 +57,9 @@ class CursorResultMetaData(ResultMetaData): returns_rows = True + def _has_key(self, key): + return key in self._keymap + def _for_freeze(self): return SimpleResultMetaData( self._keys, @@ -1203,6 +1206,7 @@ class BaseCursorResult(object): out_parameters = None _metadata = None + _metadata_from_cache = False _soft_closed = False closed = False @@ -1213,7 +1217,6 @@ class BaseCursorResult(object): obj = CursorResult(context) else: obj = LegacyCursorResult(context) - return obj def __init__(self, context): @@ -1247,8 +1250,9 @@ class BaseCursorResult(object): def _init_metadata(self, context, cursor_description): if context.compiled: if context.compiled._cached_metadata: - cached_md = context.compiled._cached_metadata - self._metadata = cached_md._adapt_to_context(context) + cached_md = self.context.compiled._cached_metadata + self._metadata = cached_md + self._metadata_from_cache = True else: self._metadata = ( diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index e30daaeb8..b5cb2a1b2 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -16,6 +16,7 @@ as the base class for their own corresponding classes. import codecs import random import re +import time import weakref from . import cursor as _cursor @@ -226,6 +227,7 @@ class DefaultDialect(interfaces.Dialect): supports_native_boolean=None, max_identifier_length=None, label_length=None, + query_cache_size=0, # int() is because the @deprecated_params decorator cannot accommodate # the direct reference to the "NO_LINTING" object compiler_linting=int(compiler.NO_LINTING), @@ -257,6 +259,10 @@ class DefaultDialect(interfaces.Dialect): if supports_native_boolean is not None: self.supports_native_boolean = supports_native_boolean self.case_sensitive = case_sensitive + if query_cache_size != 0: + self._compiled_cache = util.LRUCache(query_cache_size) + else: + self._compiled_cache = None self._user_defined_max_identifier_length = max_identifier_length if self._user_defined_max_identifier_length: @@ -702,11 +708,17 @@ class DefaultExecutionContext(interfaces.ExecutionContext): result_column_struct = None returned_defaults = None execution_options = util.immutabledict() + + cache_stats = None + invoked_statement = None + _is_implicit_returning = False _is_explicit_returning = False _is_future_result = False _is_server_side = False + _soft_closed = False + # a hook for SQLite's translation of # result column names # NOTE: pyhive is using this hook, can't remove it :( @@ -1011,6 +1023,16 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self.cursor = self.create_cursor() return self + def _get_cache_stats(self): + if self.compiled is None: + return "raw SQL" + + now = time.time() + if self.compiled.cache_key is None: + return "gen %.5fs" % (now - self.compiled._gen_time,) + else: + return "cached %.5fs" % (now - self.compiled._gen_time,) + @util.memoized_property def engine(self): return self.root_connection.engine @@ -1234,6 +1256,33 @@ class DefaultExecutionContext(interfaces.ExecutionContext): ): self._setup_out_parameters(result) + if not self._is_future_result: + conn = self.root_connection + assert not conn._is_future + + if not result._soft_closed and conn.should_close_with_result: + result._autoclose_connection = True + + self._soft_closed = result._soft_closed + + # result rewrite/ adapt step. two translations can occur here. + # one is if we are invoked against a cached statement, we want + # to rewrite the ResultMetaData to reflect the column objects + # that are in our current selectable, not the cached one. the + # other is, the CompileState can return an alternative Result + # object. Finally, CompileState might want to tell us to not + # actually do the ResultMetaData adapt step if it in fact has + # changed the selected columns in any case. + compiled = self.compiled + if compiled: + adapt_metadata = ( + result._metadata_from_cache + and not compiled._rewrites_selected_columns + ) + + if adapt_metadata: + result._metadata = result._metadata._adapt_to_context(self) + return result def _setup_out_parameters(self, result): diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 4e6b22820..0ee80ede4 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -56,6 +56,9 @@ class ResultMetaData(object): def keys(self): return RMKeyView(self) + def _has_key(self, key): + raise NotImplementedError() + def _for_freeze(self): raise NotImplementedError() @@ -171,6 +174,9 @@ class SimpleResultMetaData(ResultMetaData): self._processors = _processors + def _has_key(self, key): + return key in self._keymap + def _for_freeze(self): unique_filters = self._unique_filters if unique_filters and self._tuplefilter: @@ -287,6 +293,8 @@ class Result(InPlaceGenerative): _no_scalar_onerow = False _yield_per = None + _attributes = util.immutabledict() + def __init__(self, cursor_metadata): self._metadata = cursor_metadata @@ -548,10 +556,21 @@ class Result(InPlaceGenerative): self._generate_rows = True def _row_getter(self): - if self._source_supports_scalars and not self._generate_rows: - return None + if self._source_supports_scalars: + if not self._generate_rows: + return None + else: + _proc = self._process_row + + def process_row( + metadata, processors, keymap, key_style, scalar_obj + ): + return _proc( + metadata, processors, keymap, key_style, (scalar_obj,) + ) - process_row = self._process_row + else: + process_row = self._process_row key_style = self._process_row._default_key_style metadata = self._metadata @@ -771,16 +790,15 @@ class Result(InPlaceGenerative): uniques, strategy = self._unique_strategy def filterrows(make_row, rows, strategy, uniques): + if make_row: + rows = [make_row(row) for row in rows] + if strategy: made_rows = ( - (made_row, strategy(made_row)) - for made_row in [make_row(row) for row in rows] + (made_row, strategy(made_row)) for made_row in rows ) else: - made_rows = ( - (made_row, made_row) - for made_row in [make_row(row) for row in rows] - ) + made_rows = ((made_row, made_row) for made_row in rows) return [ made_row for made_row, sig_row in made_rows @@ -831,7 +849,8 @@ class Result(InPlaceGenerative): num = self._yield_per rows = self._fetchmany_impl(num) - rows = [make_row(row) for row in rows] + if make_row: + rows = [make_row(row) for row in rows] if post_creational_filter: rows = [post_creational_filter(row) for row in rows] return rows @@ -1114,24 +1133,42 @@ class FrozenResult(object): def __init__(self, result): self.metadata = result._metadata._for_freeze() self._post_creational_filter = result._post_creational_filter - self._source_supports_scalars = result._source_supports_scalars self._generate_rows = result._generate_rows + self._source_supports_scalars = result._source_supports_scalars + self._attributes = result._attributes result._post_creational_filter = None - self.data = result.fetchall() + if self._source_supports_scalars: + self.data = list(result._raw_row_iterator()) + else: + self.data = result.fetchall() + + def rewrite_rows(self): + if self._source_supports_scalars: + return [[elem] for elem in self.data] + else: + return [list(row) for row in self.data] - def with_data(self, data): + def with_new_rows(self, tuple_data): fr = FrozenResult.__new__(FrozenResult) fr.metadata = self.metadata fr._post_creational_filter = self._post_creational_filter - fr.data = data + fr._generate_rows = self._generate_rows + fr._attributes = self._attributes + fr._source_supports_scalars = self._source_supports_scalars + + if self._source_supports_scalars: + fr.data = [d[0] for d in tuple_data] + else: + fr.data = tuple_data return fr def __call__(self): result = IteratorResult(self.metadata, iter(self.data)) result._post_creational_filter = self._post_creational_filter - result._source_supports_scalars = self._source_supports_scalars result._generate_rows = self._generate_rows + result._attributes = self._attributes + result._source_supports_scalars = self._source_supports_scalars return result @@ -1143,9 +1180,10 @@ class IteratorResult(Result): """ - def __init__(self, cursor_metadata, iterator): + def __init__(self, cursor_metadata, iterator, raw=None): self._metadata = cursor_metadata self.iterator = iterator + self.raw = raw def _soft_close(self, **kw): self.iterator = iter([]) @@ -1189,28 +1227,23 @@ class ChunkedIteratorResult(IteratorResult): """ - def __init__(self, cursor_metadata, chunks, source_supports_scalars=False): + def __init__( + self, cursor_metadata, chunks, source_supports_scalars=False, raw=None + ): self._metadata = cursor_metadata self.chunks = chunks self._source_supports_scalars = source_supports_scalars - - self.iterator = itertools.chain.from_iterable( - self.chunks(None, self._generate_rows) - ) + self.raw = raw + self.iterator = itertools.chain.from_iterable(self.chunks(None)) def _column_slices(self, indexes): result = super(ChunkedIteratorResult, self)._column_slices(indexes) - self.iterator = itertools.chain.from_iterable( - self.chunks(self._yield_per, self._generate_rows) - ) return result @_generative def yield_per(self, num): self._yield_per = num - self.iterator = itertools.chain.from_iterable( - self.chunks(num, self._generate_rows) - ) + self.iterator = itertools.chain.from_iterable(self.chunks(num)) class MergedResult(IteratorResult): @@ -1238,8 +1271,14 @@ class MergedResult(IteratorResult): self._post_creational_filter = results[0]._post_creational_filter self._no_scalar_onerow = results[0]._no_scalar_onerow self._yield_per = results[0]._yield_per + + # going to try someting w/ this in next rev self._source_supports_scalars = results[0]._source_supports_scalars + self._generate_rows = results[0]._generate_rows + self._attributes = self._attributes.merge_with( + *[r._attributes for r in results] + ) def close(self): self._soft_close(hard=True) |
