diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-06-25 12:59:29 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-07-06 15:03:20 -0700 |
| commit | 06833fee4035b4797919f4f9a43c4b5ca270d1f8 (patch) | |
| tree | 3d8bdafd01cf80dd85de54eb82c1a49639aeda76 /taskflow/patterns | |
| parent | d746a93171ccfb5507f8a71b3e71814fc89d2a7f (diff) | |
| download | taskflow-06833fee4035b4797919f4f9a43c4b5ca270d1f8.tar.gz | |
Move how resuming is done to be disconnected from jobs/flows.
Instead of having resuming tied to a job allow a workflow to
have a resumption strategy object that will split its initial
work order into 2 segments. One that has finished previously
and one that has not finished previously. Refactor the code that
previously tied a single resumption strategy to the job class
and move it to a more generic resumption module folder.
Change-Id: I8709cd6cb7a9deecefe8d2927be517a00acb422d
Diffstat (limited to 'taskflow/patterns')
| -rw-r--r-- | taskflow/patterns/base.py | 4 | ||||
| -rw-r--r-- | taskflow/patterns/graph_flow.py | 13 | ||||
| -rw-r--r-- | taskflow/patterns/linear_flow.py | 93 | ||||
| -rw-r--r-- | taskflow/patterns/resumption/__init__.py | 17 | ||||
| -rw-r--r-- | taskflow/patterns/resumption/logbook.py | 141 |
5 files changed, 208 insertions, 60 deletions
diff --git a/taskflow/patterns/base.py b/taskflow/patterns/base.py index ccea366..a20a709 100644 --- a/taskflow/patterns/base.py +++ b/taskflow/patterns/base.py @@ -89,8 +89,8 @@ class Flow(object): def __str__(self): lines = ["Flow: %s" % (self.name)] - lines.append(" State: %s" % (self.state)) - return "\n".join(lines) + lines.append("%s" % (self.state)) + return "; ".join(lines) @abc.abstractmethod def add(self, task): diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 83a7433..c9683cb 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -50,6 +50,7 @@ class Flow(linear_flow.Flow): r = utils.Runner(task) self._graph.add_node(r, uuid=r.uuid) self._runners = [] + self._leftoff_at = None return r.uuid def _add_dependency(self, provider, requirer): @@ -58,11 +59,10 @@ class Flow(linear_flow.Flow): def __str__(self): lines = ["GraphFlow: %s" % (self.name)] - lines.append(" Number of tasks: %s" % (self._graph.number_of_nodes())) - lines.append(" Number of dependencies: %s" - % (self._graph.number_of_edges())) - lines.append(" State: %s" % (self.state)) - return "\n".join(lines) + lines.append("%s" % (self._graph.number_of_nodes())) + lines.append("%s" % (self._graph.number_of_edges())) + lines.append("%s" % (self.state)) + return "; ".join(lines) @decorators.locked def remove(self, task_uuid): @@ -76,10 +76,11 @@ class Flow(linear_flow.Flow): for r in remove_nodes: self._graph.remove_node(r) self._runners = [] + self._leftoff_at = None def _ordering(self): try: - return self._connect() + return iter(self._connect()) except g_exc.NetworkXUnfeasible: raise exc.InvalidStateException("Unable to correctly determine " "the path through the provided " diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 8122845..6ab0838 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -18,7 +18,6 @@ import collections import copy -import functools import logging from taskflow.openstack.common import excutils @@ -46,23 +45,17 @@ class Flow(base.Flow): # The tasks which have been applied will be collected here so that they # can be reverted in the correct order on failure. self._accumulator = utils.RollbackAccumulator() - # This should be a functor that returns whether a given task has - # already ran by returning a pair of (has_result, was_error, result). - # - # NOTE(harlowja): This allows for resumption by skipping tasks which - # have already occurred. The previous return value is needed due to - # the contract we have with tasks that they will be given the value - # they returned if reversion is triggered. - self.result_fetcher = None # Tasks results are stored here. Lookup is by the uuid that was # returned from the add function. self.results = {} - # The last index in the order we left off at before being - # interrupted (or failing). - self._left_off_at = 0 + # The previously left off iterator that can be used to resume from + # the last task (if interrupted and soft-reset). + self._leftoff_at = None # All runners to run are collected here. self._runners = [] self._connected = False + # The resumption strategy to use. + self.resumer = None @decorators.locked def add_many(self, tasks): @@ -78,6 +71,7 @@ class Flow(base.Flow): r = utils.Runner(task) r.runs_before = list(reversed(self._runners)) self._connected = False + self._leftoff_at = None self._runners.append(r) return r.uuid @@ -104,10 +98,9 @@ class Flow(base.Flow): def __str__(self): lines = ["LinearFlow: %s" % (self.name)] - lines.append(" Number of tasks: %s" % (len(self._runners))) - lines.append(" Last index: %s" % (self._left_off_at)) - lines.append(" State: %s" % (self.state)) - return "\n".join(lines) + lines.append("%s" % (len(self._runners))) + lines.append("%s" % (self.state)) + return "; ".join(lines) @decorators.locked def remove(self, task_uuid): @@ -116,6 +109,7 @@ class Flow(base.Flow): if r.uuid == task_uuid: self._runners.pop(i) self._connected = False + self._leftoff_at = None removed = True break if not removed: @@ -132,22 +126,26 @@ class Flow(base.Flow): return self._runners def _ordering(self): - return self._connect() + return iter(self._connect()) @decorators.locked def run(self, context, *args, **kwargs): super(Flow, self).run(context, *args, **kwargs) - if self.result_fetcher: - result_fetcher = functools.partial(self.result_fetcher, context) - else: - result_fetcher = None + def resume_it(): + if self._leftoff_at is not None: + return ([], self._leftoff_at) + if self.resumer: + (finished, leftover) = self.resumer.resume(self, + self._ordering()) + else: + finished = [] + leftover = self._ordering() + return (finished, leftover) self._change_state(context, states.STARTED) try: - run_order = self._ordering() - if self._left_off_at > 0: - run_order = run_order[self._left_off_at:] + those_finished, leftover = resume_it() except Exception: with excutils.save_and_reraise_exception(): self._change_state(context, states.FAILURE) @@ -169,6 +167,9 @@ class Flow(base.Flow): result = runner(context, *args, **kwargs) else: if failed: + # TODO(harlowja): make this configurable?? + # If we previously failed, we want to fail again at + # the same place. if not result: # If no exception or exception message was provided # or captured from the previous run then we need to @@ -196,8 +197,6 @@ class Flow(base.Flow): # intentionally). rb.result = result runner.result = result - # Alter the index we have ran at. - self._left_off_at += 1 self.results[runner.uuid] = copy.deepcopy(result) self.task_notifier.notify(states.SUCCESS, details={ 'context': context, @@ -207,7 +206,7 @@ class Flow(base.Flow): 'task_uuid': runner.uuid, }) except Exception as e: - cause = utils.FlowFailure(runner.task, self, e) + cause = utils.FlowFailure(runner, self, e) with excutils.save_and_reraise_exception(): # Notify any listeners that the task has errored. self.task_notifier.notify(states.FAILURE, details={ @@ -219,51 +218,41 @@ class Flow(base.Flow): }) self.rollback(context, cause) - # Ensure in a ready to run state. - for runner in run_order: - runner.reset() - - last_runner = 0 - was_interrupted = False - if result_fetcher: + if len(those_finished): self._change_state(context, states.RESUMING) - for (i, runner) in enumerate(run_order): - if self.state == states.INTERRUPTED: - was_interrupted = True - break - (has_result, was_error, result) = result_fetcher(self, - runner.task, - runner.uuid) - if not has_result: - break + for (r, details) in those_finished: # Fake running the task so that we trigger the same # notifications and state changes (and rollback that # would have happened in a normal flow). - last_runner = i + 1 - run_it(runner, failed=was_error, result=result, - simulate_run=True) + failed = states.FAILURE in details.get('states', []) + result = details.get('result') + run_it(r, failed=failed, result=result, simulate_run=True) - if was_interrupted: + self._leftoff_at = leftover + self._change_state(context, states.RUNNING) + if self.state == states.INTERRUPTED: return - self._change_state(context, states.RUNNING) - for runner in run_order[last_runner:]: + was_interrupted = False + for r in leftover: + r.reset() + run_it(r) if self.state == states.INTERRUPTED: was_interrupted = True break - run_it(runner) if not was_interrupted: # Only gets here if everything went successfully. self._change_state(context, states.SUCCESS) + self._leftoff_at = None @decorators.locked def reset(self): super(Flow, self).reset() self.results = {} - self.result_fetcher = None + self.resumer = None self._accumulator.reset() - self._left_off_at = 0 + self._leftoff_at = None self._connected = False @decorators.locked diff --git a/taskflow/patterns/resumption/__init__.py b/taskflow/patterns/resumption/__init__.py new file mode 100644 index 0000000..830dd2e --- /dev/null +++ b/taskflow/patterns/resumption/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py new file mode 100644 index 0000000..a77cd66 --- /dev/null +++ b/taskflow/patterns/resumption/logbook.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from taskflow import states +from taskflow import utils + +LOG = logging.getLogger(__name__) + + +class Resumption(object): + # NOTE(harlowja): This allows for resumption by skipping tasks which + # have already occurred, aka fast-forwarding through a workflow to + # the last point it stopped (if possible). + def __init__(self, logbook): + self._logbook = logbook + + def record_for(self, flow): + + def _task_listener(state, details): + """Store the result of the task under the given flow in the log + book so that it can be retrieved later.""" + task_id = details['task_uuid'] + task = details['task'] + flow = details['flow'] + LOG.debug("Recording %s:%s of %s has finished state %s", + utils.get_task_name(task), task_id, flow, state) + # TODO(harlowja): switch to using uuids + flow_id = flow.name + metadata = {} + flow_details = self._logbook[flow_id] + if state in (states.SUCCESS, states.FAILURE): + metadata['result'] = details['result'] + if task_id not in flow_details: + metadata['states'] = [state] + metadata['version'] = utils.get_task_version(task) + flow_details.add_task(task_id, metadata) + else: + details = flow_details[task_id] + immediate_version = utils.get_task_version(task) + recorded_version = details.metadata.get('version') + if recorded_version is not None: + if not utils.is_version_compatible(recorded_version, + immediate_version): + LOG.warn("Updating a task with a different version" + " than the one being listened to (%s != %s)", + recorded_version, immediate_version) + past_states = details.metadata.get('states', []) + if state not in past_states: + past_states.append(state) + details.metadata['states'] = past_states + if metadata: + details.metadata.update(metadata) + + def _workflow_listener(state, details): + """Ensure that when we receive an event from said workflow that we + make sure a logbook entry exists for that flow.""" + flow = details['flow'] + old_state = details['old_state'] + LOG.debug("%s has transitioned from %s to %s", flow, old_state, + state) + # TODO(harlowja): switch to using uuids + flow_id = flow.name + if flow_id in self._logbook: + return + self._logbook.add_flow(flow_id) + + flow.task_notifier.register('*', _task_listener) + flow.notifier.register('*', _workflow_listener) + + def _reconcile_versions(self, desired_version, task_details): + # For now don't do anything to reconcile the desired version + # from the actual version present in the task details, but in the + # future we could try to alter the task details to be in the older + # format (or more complicated logic...) + return task_details + + def _get_details(self, flow_details, runner): + task_id = runner.uuid + if task_id not in flow_details: + return (False, None) + details = flow_details[task_id] + has_completed = False + for state in details.metadata.get('states', []): + if state in (states.SUCCESS, states.FAILURE): + has_completed = True + break + if not has_completed: + return (False, None) + immediate_version = utils.get_task_version(runner.task) + recorded_version = details.metadata.get('version') + if recorded_version is not None: + if not utils.is_version_compatible(recorded_version, + immediate_version): + LOG.warn("Fetching runner metadata from a task with" + " a different version from the one being" + " processed (%s != %s)", recorded_version, + immediate_version) + details = self._reconcile_versions(immediate_version, details) + return (True, details) + + def resume(self, flow, ordering): + """Splits the initial ordering into two segments, the first which + has already completed (or errored) and the second which has not + completed or errored.""" + + # TODO(harlowja): switch to using uuids + flow_id = flow.name + if flow_id not in self._logbook: + LOG.debug("No record of %s", flow) + return ([], ordering) + flow_details = self._logbook[flow_id] + ran_already = [] + for r in ordering: + LOG.debug("Checking if ran %s of %s", r, flow) + (has_ran, details) = self._get_details(flow_details, r) + LOG.debug(has_ran) + if not has_ran: + # We need to put back the last task we took out since it did + # not run and therefore needs to, thats why we have this + # different iterator (which can do this). + return (ran_already, utils.LastFedIter(r, ordering)) + LOG.debug("Already ran %s", r) + ran_already.append((r, details.metadata)) + return (ran_already, iter([])) |
