diff options
| author | Jenkins <jenkins@review.openstack.org> | 2015-12-02 03:36:40 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2015-12-02 03:36:40 +0000 |
| commit | aba52d596240f736bca3afb78963d8e428415ca8 (patch) | |
| tree | c0c89a4895df3d702f523ce9565b5838e516ce9e /taskflow | |
| parent | 08883c01446d5090173c2318baa7fb1b08620a88 (diff) | |
| parent | 4b0b7405b0a927d5df130ee91174f148e0f54f9d (diff) | |
| download | taskflow-aba52d596240f736bca3afb78963d8e428415ca8.tar.gz | |
Merge "Move engine options extraction to __init__ methods"
Diffstat (limited to 'taskflow')
| -rw-r--r-- | taskflow/engines/action_engine/completer.py | 15 | ||||
| -rw-r--r-- | taskflow/engines/action_engine/engine.py | 55 |
2 files changed, 55 insertions, 15 deletions
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 893279d..c1813aa 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -113,6 +113,8 @@ class Completer(object): self._task_action = runtime.task_action self._retry_action = runtime.retry_action self._undefined_resolver = RevertAll(self._runtime) + self._defer_reverts = strutils.bool_from_string( + self._runtime.options.get('defer_reverts', False)) def _complete_task(self, task, outcome, result): """Completes the given task, processes task failure.""" @@ -186,20 +188,11 @@ class Completer(object): elif strategy == retry_atom.REVERT: # Ask parent retry and figure out what to do... parent_resolver = self._determine_resolution(retry, failure) - # In the future, this will be the only behavior. REVERT # should defer to the parent retry if it exists, or use the - # default REVERT_ALL if it doesn't. This lets you safely nest - # flows with retries inside flows without retries and it still - # behave as a user would expect, i.e. if the retry gets - # exhausted it reverts the outer flow unless the outer flow - # has a separate retry behavior. - defer_reverts = strutils.bool_from_string( - self._runtime.options.get('defer_reverts', False) - ) - if defer_reverts: + # default REVERT_ALL if it doesn't. + if self._defer_reverts: return parent_resolver - # Ok if the parent resolver says something not REVERT, and # it isn't just using the undefined resolver, assume the # parent knows best. diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 435b353..36ab11b 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -70,6 +70,40 @@ class ActionEngine(base.Engine): which will cause the process of reversion or retrying to commence. See the valid states in the states module to learn more about what other states the tasks and flow being ran can go through. + + **Engine options:** + + +-------------------+-----------------------+------+-----------+ + | Name/key | Description | Type | Default | + +===================+=======================+======+===========+ + | defer_reverts | This option lets you | bool | ``False`` | + | | safely nest flows | | | + | | with retries inside | | | + | | flows without retries | | | + | | and it still behaves | | | + | | as a user would | | | + | | expect (for example | | | + | | if the retry gets | | | + | | exhausted it reverts | | | + | | the outer flow unless | | | + | | the outer flow has a | | | + | | has a separate retry | | | + | | behavior). | | | + +-------------------+-----------------------+------+-----------+ + | inject_transient | When true, values | bool | ``True`` | + | | that are local to | | | + | | each atoms scope | | | + | | are injected into | | | + | | storage into a | | | + | | transient location | | | + | | (typically a local | | | + | | dictionary), when | | | + | | false those values | | | + | | are instead persisted | | | + | | into atom details | | | + | | (and saved in a non- | | | + | | transient manner). | | | + +-------------------+-----------------------+------+-----------+ """ NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS]) @@ -100,6 +134,8 @@ class ActionEngine(base.Engine): # Retries are not *currently* executed out of the engines process # or thread (this could change in the future if we desire it to). self._retry_executor = executor.SerialRetryExecutor() + self._inject_transient = strutils.bool_from_string( + self._options.get('inject_transient', True)) def _check(self, name, check_compiled, check_storage_ensured): """Check (and raise) if the engine has not reached a certain stage.""" @@ -256,14 +292,12 @@ class ActionEngine(base.Engine): def _ensure_storage(self): """Ensure all contained atoms exist in the storage unit.""" - transient = strutils.bool_from_string( - self._options.get('inject_transient', True)) self.storage.ensure_atoms( self._runtime.analyzer.iterate_nodes(compiler.ATOMS)) for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS): if atom.inject: self.storage.inject_atom_args(atom.name, atom.inject, - transient=transient) + transient=self._inject_transient) @fasteners.locked def validate(self): @@ -371,7 +405,7 @@ class _ExecutorTextMatch(collections.namedtuple('_ExecutorTextMatch', class ParallelActionEngine(ActionEngine): """Engine that runs tasks in parallel manner. - Supported option keys: + **Additional engine options:** * ``executor``: a object that implements a :pep:`3148` compatible executor interface; it will be used for scheduling tasks. The following @@ -401,6 +435,19 @@ String (case insensitive) Executor used ``threads`` :class:`~.executor.ParallelThreadTaskExecutor` =========================== =============================================== + * ``max_workers``: a integer that will affect the number of parallel + workers that are used to dispatch tasks into (this number is bounded + by the maximum parallelization your workflow can support). + + * ``dispatch_periodicity``: a float (in seconds) that will affect the + parallel process task executor (and therefore is **only** applicable when + the executor provided above is of the process variant). This number + affects how much time the process task executor waits for messages from + child processes (typically indicating they have finished or failed). A + lower number will have high granularity but *currently* involves more + polling while a higher number will involve less polling but a slower time + for an engine to notice a task has completed. + .. |cfp| replace:: concurrent.futures.process .. |cft| replace:: concurrent.futures.thread .. |cf| replace:: concurrent.futures |
