summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/engine.py
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-12-02 03:36:40 +0000
committerGerrit Code Review <review@openstack.org>2015-12-02 03:36:40 +0000
commitaba52d596240f736bca3afb78963d8e428415ca8 (patch)
treec0c89a4895df3d702f523ce9565b5838e516ce9e /taskflow/engines/action_engine/engine.py
parent08883c01446d5090173c2318baa7fb1b08620a88 (diff)
parent4b0b7405b0a927d5df130ee91174f148e0f54f9d (diff)
downloadtaskflow-aba52d596240f736bca3afb78963d8e428415ca8.tar.gz
Merge "Move engine options extraction to __init__ methods"
Diffstat (limited to 'taskflow/engines/action_engine/engine.py')
-rw-r--r--taskflow/engines/action_engine/engine.py55
1 files changed, 51 insertions, 4 deletions
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 435b353..36ab11b 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -70,6 +70,40 @@ class ActionEngine(base.Engine):
which will cause the process of reversion or retrying to commence. See the
valid states in the states module to learn more about what other states
the tasks and flow being ran can go through.
+
+ **Engine options:**
+
+ +-------------------+-----------------------+------+-----------+
+ | Name/key | Description | Type | Default |
+ +===================+=======================+======+===========+
+ | defer_reverts | This option lets you | bool | ``False`` |
+ | | safely nest flows | | |
+ | | with retries inside | | |
+ | | flows without retries | | |
+ | | and it still behaves | | |
+ | | as a user would | | |
+ | | expect (for example | | |
+ | | if the retry gets | | |
+ | | exhausted it reverts | | |
+ | | the outer flow unless | | |
+ | | the outer flow has a | | |
+ | | has a separate retry | | |
+ | | behavior). | | |
+ +-------------------+-----------------------+------+-----------+
+ | inject_transient | When true, values | bool | ``True`` |
+ | | that are local to | | |
+ | | each atoms scope | | |
+ | | are injected into | | |
+ | | storage into a | | |
+ | | transient location | | |
+ | | (typically a local | | |
+ | | dictionary), when | | |
+ | | false those values | | |
+ | | are instead persisted | | |
+ | | into atom details | | |
+ | | (and saved in a non- | | |
+ | | transient manner). | | |
+ +-------------------+-----------------------+------+-----------+
"""
NO_RERAISING_STATES = frozenset([states.SUSPENDED, states.SUCCESS])
@@ -100,6 +134,8 @@ class ActionEngine(base.Engine):
# Retries are not *currently* executed out of the engines process
# or thread (this could change in the future if we desire it to).
self._retry_executor = executor.SerialRetryExecutor()
+ self._inject_transient = strutils.bool_from_string(
+ self._options.get('inject_transient', True))
def _check(self, name, check_compiled, check_storage_ensured):
"""Check (and raise) if the engine has not reached a certain stage."""
@@ -256,14 +292,12 @@ class ActionEngine(base.Engine):
def _ensure_storage(self):
"""Ensure all contained atoms exist in the storage unit."""
- transient = strutils.bool_from_string(
- self._options.get('inject_transient', True))
self.storage.ensure_atoms(
self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
if atom.inject:
self.storage.inject_atom_args(atom.name, atom.inject,
- transient=transient)
+ transient=self._inject_transient)
@fasteners.locked
def validate(self):
@@ -371,7 +405,7 @@ class _ExecutorTextMatch(collections.namedtuple('_ExecutorTextMatch',
class ParallelActionEngine(ActionEngine):
"""Engine that runs tasks in parallel manner.
- Supported option keys:
+ **Additional engine options:**
* ``executor``: a object that implements a :pep:`3148` compatible executor
interface; it will be used for scheduling tasks. The following
@@ -401,6 +435,19 @@ String (case insensitive) Executor used
``threads`` :class:`~.executor.ParallelThreadTaskExecutor`
=========================== ===============================================
+ * ``max_workers``: a integer that will affect the number of parallel
+ workers that are used to dispatch tasks into (this number is bounded
+ by the maximum parallelization your workflow can support).
+
+ * ``dispatch_periodicity``: a float (in seconds) that will affect the
+ parallel process task executor (and therefore is **only** applicable when
+ the executor provided above is of the process variant). This number
+ affects how much time the process task executor waits for messages from
+ child processes (typically indicating they have finished or failed). A
+ lower number will have high granularity but *currently* involves more
+ polling while a higher number will involve less polling but a slower time
+ for an engine to notice a task has completed.
+
.. |cfp| replace:: concurrent.futures.process
.. |cft| replace:: concurrent.futures.thread
.. |cf| replace:: concurrent.futures