diff options
| author | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-10-03 17:03:22 +0400 |
|---|---|---|
| committer | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-10-09 19:24:27 +0400 |
| commit | c26fbb2387332268b7432d4d67f7df0a394df6a5 (patch) | |
| tree | 15708dff4deaf3d9ab4129a39f83391338822c37 /taskflow/examples | |
| parent | ce7e2ad38e9c136d5c2b43db8744e62605dd777c (diff) | |
| download | taskflow-c26fbb2387332268b7432d4d67f7df0a394df6a5.tar.gz | |
Resumption from backend for action engine
Simple refactoring and minor code adjustments to make resumption
from backend actually work:
- call engine.compile and check for missing dependencies
on every run;
- misc.Failure equality semantics adjusted;
- load failures from backend on every run.
Change-Id: I8a0462f2dec0ec66a19ee6a5ef10e4be48110e19
Diffstat (limited to 'taskflow/examples')
| -rw-r--r-- | taskflow/examples/resume_from_backend.out.txt | 21 | ||||
| -rw-r--r-- | taskflow/examples/resume_from_backend.py | 117 | ||||
| -rw-r--r-- | taskflow/examples/resume_many_flows.out.txt | 32 | ||||
| -rw-r--r-- | taskflow/examples/resume_many_flows.py | 71 | ||||
| -rw-r--r-- | taskflow/examples/resume_many_flows/my_flows.py | 45 | ||||
| -rw-r--r-- | taskflow/examples/resume_many_flows/my_utils.py | 31 | ||||
| -rw-r--r-- | taskflow/examples/resume_many_flows/resume_all.py | 62 | ||||
| -rw-r--r-- | taskflow/examples/resume_many_flows/run_flow.py | 49 |
8 files changed, 428 insertions, 0 deletions
diff --git a/taskflow/examples/resume_from_backend.out.txt b/taskflow/examples/resume_from_backend.out.txt new file mode 100644 index 0000000..e4a30cf --- /dev/null +++ b/taskflow/examples/resume_from_backend.out.txt @@ -0,0 +1,21 @@ + +At the beginning, there is no state: +Flow state: None + +Running: +executing first==1.0 + +After running: +Flow state: SUSPENDED +boom==1.0: SUCCESS, result=None +first==1.0: SUCCESS, result=u'ok' +second==1.0: PENDING, result=None + +Resuming and running again: +executing second==1.0 + +At the end: +Flow state: SUCCESS +boom==1.0: SUCCESS, result=None +first==1.0: SUCCESS, result=u'ok' +second==1.0: SUCCESS, result=u'ok' diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py new file mode 100644 index 0000000..b9df1ca --- /dev/null +++ b/taskflow/examples/resume_from_backend.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines +from taskflow.patterns import linear_flow as lf +from taskflow.persistence import backends +from taskflow import task +from taskflow.utils import persistence_utils as p_utils + + +### UTILITY FUNCTIONS ######################################### + + +def print_task_states(flowdetail, msg): + print(msg) + print('Flow state: %s' % flowdetail.state) + items = sorted((td.name, td.version, td.state, td.results) + for td in flowdetail) + for item in items: + print("%s==%s: %s, result=%r" % item) + + +def get_backend(): + try: + backend_uri = sys.argv[1] + except Exception: + backend_uri = 'sqlite://' + + backend = backends.fetch({'connection': backend_uri}) + backend.get_connection().upgrade() + return backend + + +def find_flow_detail(backend, lb_id, fd_id): + conn = backend.get_connection() + lb = conn.get_logbook(lb_id) + return lb.find(fd_id) + + +### CREATE FLOW ############################################### + + +class InterruptTask(task.Task): + def execute(self): + # DO NOT TRY THIS AT HOME + engine.suspend() + + +class TestTask(task.Task): + def execute(self): + print 'executing %s' % self + return 'ok' + + +def flow_factory(): + return lf.Flow('resume from backend example').add( + TestTask(name='first'), + InterruptTask(name='boom'), + TestTask(name='second')) + + +### INITIALIZE PERSISTENCE #################################### + +backend = get_backend() +logbook = p_utils.temporary_log_book(backend) + + +### CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### + +flow = flow_factory() +flowdetail = p_utils.create_flow_detail(flow, logbook, backend) +engine = taskflow.engines.load(flow, flow_detail=flowdetail, + backend=backend) + +print_task_states(flowdetail, "\nAt the beginning, there is no state:") +print("\nRunning:") +engine.run() +print_task_states(flowdetail, "\nAfter running:") + + +### RE-CREATE, RESUME, RUN #################################### + +print("\nResuming and running again:") +# reload flowdetail from backend +flowdetail2 = find_flow_detail(backend, logbook.uuid, + flowdetail.uuid) +engine2 = taskflow.engines.load(flow_factory(), + flow_detail=flowdetail, + backend=backend) +engine2.run() +print_task_states(flowdetail, "\nAt the end:") diff --git a/taskflow/examples/resume_many_flows.out.txt b/taskflow/examples/resume_many_flows.out.txt new file mode 100644 index 0000000..bfaa072 --- /dev/null +++ b/taskflow/examples/resume_many_flows.out.txt @@ -0,0 +1,32 @@ +Run flow: +Running flow example 18995b55-aaad-49fa-938f-006ac21ea4c7 +executing first==1.0 +executing boom==1.0 +> this time not exiting +executing second==1.0 + + +Run flow, something happens: +Running flow example f8f62ea6-1c9b-4e81-9ff9-1acaa299a648 +executing first==1.0 +executing boom==1.0 +> Critical error: boom = exit please + + +Run flow, something happens again: +Running flow example 16f11c15-4d8a-4552-b422-399565c873c4 +executing first==1.0 +executing boom==1.0 +> Critical error: boom = exit please + + +Resuming all failed flows +Resuming flow example f8f62ea6-1c9b-4e81-9ff9-1acaa299a648 +executing boom==1.0 +> this time not exiting +executing second==1.0 +Resuming flow example 16f11c15-4d8a-4552-b422-399565c873c4 +executing boom==1.0 +> this time not exiting +executing second==1.0 + diff --git a/taskflow/examples/resume_many_flows.py b/taskflow/examples/resume_many_flows.py new file mode 100644 index 0000000..bab5832 --- /dev/null +++ b/taskflow/examples/resume_many_flows.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import subprocess +import sys +import tempfile + + +def _exec(cmd, add_env=None): + env = None + if add_env: + env = os.environ.copy() + env.update(add_env) + + proc = subprocess.Popen(cmd, env=env, stdin=None, + stdout=subprocess.PIPE, + stderr=sys.stderr) + + stdout, stderr = proc.communicate() + rc = proc.returncode + if rc != 0: + raise RuntimeError("Could not run %s [%s]", cmd, rc) + print stdout + + +def _path_to(name): + return os.path.abspath(os.path.join(os.path.dirname(__file__), + 'resume_many_flows', name)) + + +def main(): + try: + fd, db_path = tempfile.mkstemp(prefix='tf-resume-example') + os.close(fd) + backend_uri = 'sqlite:///%s' % db_path + + def run_example(name, add_env=None): + _exec([sys.executable, _path_to(name), backend_uri], add_env) + + print('Run flow:') + run_example('run_flow.py') + + print('\nRun flow, something happens:') + run_example('run_flow.py', {'BOOM': 'exit please'}) + + print('\nRun flow, something happens again:') + run_example('run_flow.py', {'BOOM': 'exit please'}) + + print('\nResuming all failed flows') + run_example('resume_all.py') + finally: + os.unlink(db_path) + +if __name__ == '__main__': + main() diff --git a/taskflow/examples/resume_many_flows/my_flows.py b/taskflow/examples/resume_many_flows/my_flows.py new file mode 100644 index 0000000..4cd81e6 --- /dev/null +++ b/taskflow/examples/resume_many_flows/my_flows.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from taskflow.patterns import linear_flow as lf +from taskflow import task + + +class UnfortunateTask(task.Task): + def execute(self): + print('executing %s' % self) + boom = os.environ.get('BOOM') + if boom: + print('> Critical error: boom = %s' % boom) + raise SystemExit() + else: + print('> this time not exiting') + + +class TestTask(task.Task): + def execute(self): + print('executing %s' % self) + + +def flow_factory(): + return lf.Flow('example').add( + TestTask(name='first'), + UnfortunateTask(name='boom'), + TestTask(name='second')) diff --git a/taskflow/examples/resume_many_flows/my_utils.py b/taskflow/examples/resume_many_flows/my_utils.py new file mode 100644 index 0000000..dd1fbd5 --- /dev/null +++ b/taskflow/examples/resume_many_flows/my_utils.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from taskflow.persistence import backends + + +def get_backend(): + try: + backend_uri = sys.argv[1] + except Exception: + backend_uri = 'sqlite://' + backend = backends.fetch({'connection': backend_uri}) + backend.get_connection().upgrade() + return backend diff --git a/taskflow/examples/resume_many_flows/resume_all.py b/taskflow/examples/resume_many_flows/resume_all.py new file mode 100644 index 0000000..82be307 --- /dev/null +++ b/taskflow/examples/resume_many_flows/resume_all.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +self_dir = os.path.abspath(os.path.dirname(__file__)) +top_dir = os.path.abspath( + os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) + +sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) + + +import taskflow.engines + +from taskflow import states + +import my_flows # noqa +import my_utils # noqa + + +FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED) + + +def resume(flowdetail, backend): + print('Resuming flow %s %s' % (flowdetail.name, flowdetail.uuid)) + engine = taskflow.engines.load(my_flows.flow_factory(), + flow_detail=flowdetail, + backend=backend) + engine.run() + + +def main(): + backend = my_utils.get_backend() + logbooks = list(backend.get_connection().get_logbooks()) + for lb in logbooks: + for fd in lb: + if fd.state not in FINISHED_STATES: + resume(fd, backend) + + +if __name__ == '__main__': + main() diff --git a/taskflow/examples/resume_many_flows/run_flow.py b/taskflow/examples/resume_many_flows/run_flow.py new file mode 100644 index 0000000..523c984 --- /dev/null +++ b/taskflow/examples/resume_many_flows/run_flow.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +self_dir = os.path.abspath(os.path.dirname(__file__)) +top_dir = os.path.abspath( + os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) + +sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) + +import taskflow.engines +from taskflow.utils import persistence_utils as p_utils + +import my_flows # noqa +import my_utils # noqa + + +backend = my_utils.get_backend() +logbook = p_utils.temporary_log_book(backend) + +flow = my_flows.flow_factory() + +flowdetail = p_utils.create_flow_detail(flow, logbook, backend) +engine = taskflow.engines.load(flow, flow_detail=flowdetail, + backend=backend) + +print('Running flow %s %s' % (flowdetail.name, flowdetail.uuid)) +engine.run() |
