summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2013-06-25 12:59:29 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2013-07-06 15:03:20 -0700
commit06833fee4035b4797919f4f9a43c4b5ca270d1f8 (patch)
tree3d8bdafd01cf80dd85de54eb82c1a49639aeda76 /taskflow/patterns
parentd746a93171ccfb5507f8a71b3e71814fc89d2a7f (diff)
downloadtaskflow-06833fee4035b4797919f4f9a43c4b5ca270d1f8.tar.gz
Move how resuming is done to be disconnected from jobs/flows.
Instead of having resuming tied to a job allow a workflow to have a resumption strategy object that will split its initial work order into 2 segments. One that has finished previously and one that has not finished previously. Refactor the code that previously tied a single resumption strategy to the job class and move it to a more generic resumption module folder. Change-Id: I8709cd6cb7a9deecefe8d2927be517a00acb422d
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/base.py4
-rw-r--r--taskflow/patterns/graph_flow.py13
-rw-r--r--taskflow/patterns/linear_flow.py93
-rw-r--r--taskflow/patterns/resumption/__init__.py17
-rw-r--r--taskflow/patterns/resumption/logbook.py141
5 files changed, 208 insertions, 60 deletions
diff --git a/taskflow/patterns/base.py b/taskflow/patterns/base.py
index ccea366..a20a709 100644
--- a/taskflow/patterns/base.py
+++ b/taskflow/patterns/base.py
@@ -89,8 +89,8 @@ class Flow(object):
def __str__(self):
lines = ["Flow: %s" % (self.name)]
- lines.append(" State: %s" % (self.state))
- return "\n".join(lines)
+ lines.append("%s" % (self.state))
+ return "; ".join(lines)
@abc.abstractmethod
def add(self, task):
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 83a7433..c9683cb 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -50,6 +50,7 @@ class Flow(linear_flow.Flow):
r = utils.Runner(task)
self._graph.add_node(r, uuid=r.uuid)
self._runners = []
+ self._leftoff_at = None
return r.uuid
def _add_dependency(self, provider, requirer):
@@ -58,11 +59,10 @@ class Flow(linear_flow.Flow):
def __str__(self):
lines = ["GraphFlow: %s" % (self.name)]
- lines.append(" Number of tasks: %s" % (self._graph.number_of_nodes()))
- lines.append(" Number of dependencies: %s"
- % (self._graph.number_of_edges()))
- lines.append(" State: %s" % (self.state))
- return "\n".join(lines)
+ lines.append("%s" % (self._graph.number_of_nodes()))
+ lines.append("%s" % (self._graph.number_of_edges()))
+ lines.append("%s" % (self.state))
+ return "; ".join(lines)
@decorators.locked
def remove(self, task_uuid):
@@ -76,10 +76,11 @@ class Flow(linear_flow.Flow):
for r in remove_nodes:
self._graph.remove_node(r)
self._runners = []
+ self._leftoff_at = None
def _ordering(self):
try:
- return self._connect()
+ return iter(self._connect())
except g_exc.NetworkXUnfeasible:
raise exc.InvalidStateException("Unable to correctly determine "
"the path through the provided "
diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py
index 8122845..6ab0838 100644
--- a/taskflow/patterns/linear_flow.py
+++ b/taskflow/patterns/linear_flow.py
@@ -18,7 +18,6 @@
import collections
import copy
-import functools
import logging
from taskflow.openstack.common import excutils
@@ -46,23 +45,17 @@ class Flow(base.Flow):
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
- # This should be a functor that returns whether a given task has
- # already ran by returning a pair of (has_result, was_error, result).
- #
- # NOTE(harlowja): This allows for resumption by skipping tasks which
- # have already occurred. The previous return value is needed due to
- # the contract we have with tasks that they will be given the value
- # they returned if reversion is triggered.
- self.result_fetcher = None
# Tasks results are stored here. Lookup is by the uuid that was
# returned from the add function.
self.results = {}
- # The last index in the order we left off at before being
- # interrupted (or failing).
- self._left_off_at = 0
+ # The previously left off iterator that can be used to resume from
+ # the last task (if interrupted and soft-reset).
+ self._leftoff_at = None
# All runners to run are collected here.
self._runners = []
self._connected = False
+ # The resumption strategy to use.
+ self.resumer = None
@decorators.locked
def add_many(self, tasks):
@@ -78,6 +71,7 @@ class Flow(base.Flow):
r = utils.Runner(task)
r.runs_before = list(reversed(self._runners))
self._connected = False
+ self._leftoff_at = None
self._runners.append(r)
return r.uuid
@@ -104,10 +98,9 @@ class Flow(base.Flow):
def __str__(self):
lines = ["LinearFlow: %s" % (self.name)]
- lines.append(" Number of tasks: %s" % (len(self._runners)))
- lines.append(" Last index: %s" % (self._left_off_at))
- lines.append(" State: %s" % (self.state))
- return "\n".join(lines)
+ lines.append("%s" % (len(self._runners)))
+ lines.append("%s" % (self.state))
+ return "; ".join(lines)
@decorators.locked
def remove(self, task_uuid):
@@ -116,6 +109,7 @@ class Flow(base.Flow):
if r.uuid == task_uuid:
self._runners.pop(i)
self._connected = False
+ self._leftoff_at = None
removed = True
break
if not removed:
@@ -132,22 +126,26 @@ class Flow(base.Flow):
return self._runners
def _ordering(self):
- return self._connect()
+ return iter(self._connect())
@decorators.locked
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
- if self.result_fetcher:
- result_fetcher = functools.partial(self.result_fetcher, context)
- else:
- result_fetcher = None
+ def resume_it():
+ if self._leftoff_at is not None:
+ return ([], self._leftoff_at)
+ if self.resumer:
+ (finished, leftover) = self.resumer.resume(self,
+ self._ordering())
+ else:
+ finished = []
+ leftover = self._ordering()
+ return (finished, leftover)
self._change_state(context, states.STARTED)
try:
- run_order = self._ordering()
- if self._left_off_at > 0:
- run_order = run_order[self._left_off_at:]
+ those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
@@ -169,6 +167,9 @@ class Flow(base.Flow):
result = runner(context, *args, **kwargs)
else:
if failed:
+ # TODO(harlowja): make this configurable??
+ # If we previously failed, we want to fail again at
+ # the same place.
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
@@ -196,8 +197,6 @@ class Flow(base.Flow):
# intentionally).
rb.result = result
runner.result = result
- # Alter the index we have ran at.
- self._left_off_at += 1
self.results[runner.uuid] = copy.deepcopy(result)
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
@@ -207,7 +206,7 @@ class Flow(base.Flow):
'task_uuid': runner.uuid,
})
except Exception as e:
- cause = utils.FlowFailure(runner.task, self, e)
+ cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
@@ -219,51 +218,41 @@ class Flow(base.Flow):
})
self.rollback(context, cause)
- # Ensure in a ready to run state.
- for runner in run_order:
- runner.reset()
-
- last_runner = 0
- was_interrupted = False
- if result_fetcher:
+ if len(those_finished):
self._change_state(context, states.RESUMING)
- for (i, runner) in enumerate(run_order):
- if self.state == states.INTERRUPTED:
- was_interrupted = True
- break
- (has_result, was_error, result) = result_fetcher(self,
- runner.task,
- runner.uuid)
- if not has_result:
- break
+ for (r, details) in those_finished:
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
- last_runner = i + 1
- run_it(runner, failed=was_error, result=result,
- simulate_run=True)
+ failed = states.FAILURE in details.get('states', [])
+ result = details.get('result')
+ run_it(r, failed=failed, result=result, simulate_run=True)
- if was_interrupted:
+ self._leftoff_at = leftover
+ self._change_state(context, states.RUNNING)
+ if self.state == states.INTERRUPTED:
return
- self._change_state(context, states.RUNNING)
- for runner in run_order[last_runner:]:
+ was_interrupted = False
+ for r in leftover:
+ r.reset()
+ run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
- run_it(runner)
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
+ self._leftoff_at = None
@decorators.locked
def reset(self):
super(Flow, self).reset()
self.results = {}
- self.result_fetcher = None
+ self.resumer = None
self._accumulator.reset()
- self._left_off_at = 0
+ self._leftoff_at = None
self._connected = False
@decorators.locked
diff --git a/taskflow/patterns/resumption/__init__.py b/taskflow/patterns/resumption/__init__.py
new file mode 100644
index 0000000..830dd2e
--- /dev/null
+++ b/taskflow/patterns/resumption/__init__.py
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py
new file mode 100644
index 0000000..a77cd66
--- /dev/null
+++ b/taskflow/patterns/resumption/logbook.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from taskflow import states
+from taskflow import utils
+
+LOG = logging.getLogger(__name__)
+
+
+class Resumption(object):
+ # NOTE(harlowja): This allows for resumption by skipping tasks which
+ # have already occurred, aka fast-forwarding through a workflow to
+ # the last point it stopped (if possible).
+ def __init__(self, logbook):
+ self._logbook = logbook
+
+ def record_for(self, flow):
+
+ def _task_listener(state, details):
+ """Store the result of the task under the given flow in the log
+ book so that it can be retrieved later."""
+ task_id = details['task_uuid']
+ task = details['task']
+ flow = details['flow']
+ LOG.debug("Recording %s:%s of %s has finished state %s",
+ utils.get_task_name(task), task_id, flow, state)
+ # TODO(harlowja): switch to using uuids
+ flow_id = flow.name
+ metadata = {}
+ flow_details = self._logbook[flow_id]
+ if state in (states.SUCCESS, states.FAILURE):
+ metadata['result'] = details['result']
+ if task_id not in flow_details:
+ metadata['states'] = [state]
+ metadata['version'] = utils.get_task_version(task)
+ flow_details.add_task(task_id, metadata)
+ else:
+ details = flow_details[task_id]
+ immediate_version = utils.get_task_version(task)
+ recorded_version = details.metadata.get('version')
+ if recorded_version is not None:
+ if not utils.is_version_compatible(recorded_version,
+ immediate_version):
+ LOG.warn("Updating a task with a different version"
+ " than the one being listened to (%s != %s)",
+ recorded_version, immediate_version)
+ past_states = details.metadata.get('states', [])
+ if state not in past_states:
+ past_states.append(state)
+ details.metadata['states'] = past_states
+ if metadata:
+ details.metadata.update(metadata)
+
+ def _workflow_listener(state, details):
+ """Ensure that when we receive an event from said workflow that we
+ make sure a logbook entry exists for that flow."""
+ flow = details['flow']
+ old_state = details['old_state']
+ LOG.debug("%s has transitioned from %s to %s", flow, old_state,
+ state)
+ # TODO(harlowja): switch to using uuids
+ flow_id = flow.name
+ if flow_id in self._logbook:
+ return
+ self._logbook.add_flow(flow_id)
+
+ flow.task_notifier.register('*', _task_listener)
+ flow.notifier.register('*', _workflow_listener)
+
+ def _reconcile_versions(self, desired_version, task_details):
+ # For now don't do anything to reconcile the desired version
+ # from the actual version present in the task details, but in the
+ # future we could try to alter the task details to be in the older
+ # format (or more complicated logic...)
+ return task_details
+
+ def _get_details(self, flow_details, runner):
+ task_id = runner.uuid
+ if task_id not in flow_details:
+ return (False, None)
+ details = flow_details[task_id]
+ has_completed = False
+ for state in details.metadata.get('states', []):
+ if state in (states.SUCCESS, states.FAILURE):
+ has_completed = True
+ break
+ if not has_completed:
+ return (False, None)
+ immediate_version = utils.get_task_version(runner.task)
+ recorded_version = details.metadata.get('version')
+ if recorded_version is not None:
+ if not utils.is_version_compatible(recorded_version,
+ immediate_version):
+ LOG.warn("Fetching runner metadata from a task with"
+ " a different version from the one being"
+ " processed (%s != %s)", recorded_version,
+ immediate_version)
+ details = self._reconcile_versions(immediate_version, details)
+ return (True, details)
+
+ def resume(self, flow, ordering):
+ """Splits the initial ordering into two segments, the first which
+ has already completed (or errored) and the second which has not
+ completed or errored."""
+
+ # TODO(harlowja): switch to using uuids
+ flow_id = flow.name
+ if flow_id not in self._logbook:
+ LOG.debug("No record of %s", flow)
+ return ([], ordering)
+ flow_details = self._logbook[flow_id]
+ ran_already = []
+ for r in ordering:
+ LOG.debug("Checking if ran %s of %s", r, flow)
+ (has_ran, details) = self._get_details(flow_details, r)
+ LOG.debug(has_ran)
+ if not has_ran:
+ # We need to put back the last task we took out since it did
+ # not run and therefore needs to, thats why we have this
+ # different iterator (which can do this).
+ return (ran_already, utils.LastFedIter(r, ordering))
+ LOG.debug("Already ran %s", r)
+ ran_already.append((r, details.metadata))
+ return (ran_already, iter([]))