summaryrefslogtreecommitdiff
path: root/taskflow/examples
diff options
context:
space:
mode:
authorIvan A. Melnikov <imelnikov@griddynamics.com>2013-10-03 17:03:22 +0400
committerIvan A. Melnikov <imelnikov@griddynamics.com>2013-10-09 19:24:27 +0400
commitc26fbb2387332268b7432d4d67f7df0a394df6a5 (patch)
tree15708dff4deaf3d9ab4129a39f83391338822c37 /taskflow/examples
parentce7e2ad38e9c136d5c2b43db8744e62605dd777c (diff)
downloadtaskflow-c26fbb2387332268b7432d4d67f7df0a394df6a5.tar.gz
Resumption from backend for action engine
Simple refactoring and minor code adjustments to make resumption from backend actually work: - call engine.compile and check for missing dependencies on every run; - misc.Failure equality semantics adjusted; - load failures from backend on every run. Change-Id: I8a0462f2dec0ec66a19ee6a5ef10e4be48110e19
Diffstat (limited to 'taskflow/examples')
-rw-r--r--taskflow/examples/resume_from_backend.out.txt21
-rw-r--r--taskflow/examples/resume_from_backend.py117
-rw-r--r--taskflow/examples/resume_many_flows.out.txt32
-rw-r--r--taskflow/examples/resume_many_flows.py71
-rw-r--r--taskflow/examples/resume_many_flows/my_flows.py45
-rw-r--r--taskflow/examples/resume_many_flows/my_utils.py31
-rw-r--r--taskflow/examples/resume_many_flows/resume_all.py62
-rw-r--r--taskflow/examples/resume_many_flows/run_flow.py49
8 files changed, 428 insertions, 0 deletions
diff --git a/taskflow/examples/resume_from_backend.out.txt b/taskflow/examples/resume_from_backend.out.txt
new file mode 100644
index 0000000..e4a30cf
--- /dev/null
+++ b/taskflow/examples/resume_from_backend.out.txt
@@ -0,0 +1,21 @@
+
+At the beginning, there is no state:
+Flow state: None
+
+Running:
+executing first==1.0
+
+After running:
+Flow state: SUSPENDED
+boom==1.0: SUCCESS, result=None
+first==1.0: SUCCESS, result=u'ok'
+second==1.0: PENDING, result=None
+
+Resuming and running again:
+executing second==1.0
+
+At the end:
+Flow state: SUCCESS
+boom==1.0: SUCCESS, result=None
+first==1.0: SUCCESS, result=u'ok'
+second==1.0: SUCCESS, result=u'ok'
diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py
new file mode 100644
index 0000000..b9df1ca
--- /dev/null
+++ b/taskflow/examples/resume_from_backend.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import sys
+
+logging.basicConfig(level=logging.ERROR)
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+import taskflow.engines
+from taskflow.patterns import linear_flow as lf
+from taskflow.persistence import backends
+from taskflow import task
+from taskflow.utils import persistence_utils as p_utils
+
+
+### UTILITY FUNCTIONS #########################################
+
+
+def print_task_states(flowdetail, msg):
+ print(msg)
+ print('Flow state: %s' % flowdetail.state)
+ items = sorted((td.name, td.version, td.state, td.results)
+ for td in flowdetail)
+ for item in items:
+ print("%s==%s: %s, result=%r" % item)
+
+
+def get_backend():
+ try:
+ backend_uri = sys.argv[1]
+ except Exception:
+ backend_uri = 'sqlite://'
+
+ backend = backends.fetch({'connection': backend_uri})
+ backend.get_connection().upgrade()
+ return backend
+
+
+def find_flow_detail(backend, lb_id, fd_id):
+ conn = backend.get_connection()
+ lb = conn.get_logbook(lb_id)
+ return lb.find(fd_id)
+
+
+### CREATE FLOW ###############################################
+
+
+class InterruptTask(task.Task):
+ def execute(self):
+ # DO NOT TRY THIS AT HOME
+ engine.suspend()
+
+
+class TestTask(task.Task):
+ def execute(self):
+ print 'executing %s' % self
+ return 'ok'
+
+
+def flow_factory():
+ return lf.Flow('resume from backend example').add(
+ TestTask(name='first'),
+ InterruptTask(name='boom'),
+ TestTask(name='second'))
+
+
+### INITIALIZE PERSISTENCE ####################################
+
+backend = get_backend()
+logbook = p_utils.temporary_log_book(backend)
+
+
+### CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
+
+flow = flow_factory()
+flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
+engine = taskflow.engines.load(flow, flow_detail=flowdetail,
+ backend=backend)
+
+print_task_states(flowdetail, "\nAt the beginning, there is no state:")
+print("\nRunning:")
+engine.run()
+print_task_states(flowdetail, "\nAfter running:")
+
+
+### RE-CREATE, RESUME, RUN ####################################
+
+print("\nResuming and running again:")
+# reload flowdetail from backend
+flowdetail2 = find_flow_detail(backend, logbook.uuid,
+ flowdetail.uuid)
+engine2 = taskflow.engines.load(flow_factory(),
+ flow_detail=flowdetail,
+ backend=backend)
+engine2.run()
+print_task_states(flowdetail, "\nAt the end:")
diff --git a/taskflow/examples/resume_many_flows.out.txt b/taskflow/examples/resume_many_flows.out.txt
new file mode 100644
index 0000000..bfaa072
--- /dev/null
+++ b/taskflow/examples/resume_many_flows.out.txt
@@ -0,0 +1,32 @@
+Run flow:
+Running flow example 18995b55-aaad-49fa-938f-006ac21ea4c7
+executing first==1.0
+executing boom==1.0
+> this time not exiting
+executing second==1.0
+
+
+Run flow, something happens:
+Running flow example f8f62ea6-1c9b-4e81-9ff9-1acaa299a648
+executing first==1.0
+executing boom==1.0
+> Critical error: boom = exit please
+
+
+Run flow, something happens again:
+Running flow example 16f11c15-4d8a-4552-b422-399565c873c4
+executing first==1.0
+executing boom==1.0
+> Critical error: boom = exit please
+
+
+Resuming all failed flows
+Resuming flow example f8f62ea6-1c9b-4e81-9ff9-1acaa299a648
+executing boom==1.0
+> this time not exiting
+executing second==1.0
+Resuming flow example 16f11c15-4d8a-4552-b422-399565c873c4
+executing boom==1.0
+> this time not exiting
+executing second==1.0
+
diff --git a/taskflow/examples/resume_many_flows.py b/taskflow/examples/resume_many_flows.py
new file mode 100644
index 0000000..bab5832
--- /dev/null
+++ b/taskflow/examples/resume_many_flows.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import subprocess
+import sys
+import tempfile
+
+
+def _exec(cmd, add_env=None):
+ env = None
+ if add_env:
+ env = os.environ.copy()
+ env.update(add_env)
+
+ proc = subprocess.Popen(cmd, env=env, stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=sys.stderr)
+
+ stdout, stderr = proc.communicate()
+ rc = proc.returncode
+ if rc != 0:
+ raise RuntimeError("Could not run %s [%s]", cmd, rc)
+ print stdout
+
+
+def _path_to(name):
+ return os.path.abspath(os.path.join(os.path.dirname(__file__),
+ 'resume_many_flows', name))
+
+
+def main():
+ try:
+ fd, db_path = tempfile.mkstemp(prefix='tf-resume-example')
+ os.close(fd)
+ backend_uri = 'sqlite:///%s' % db_path
+
+ def run_example(name, add_env=None):
+ _exec([sys.executable, _path_to(name), backend_uri], add_env)
+
+ print('Run flow:')
+ run_example('run_flow.py')
+
+ print('\nRun flow, something happens:')
+ run_example('run_flow.py', {'BOOM': 'exit please'})
+
+ print('\nRun flow, something happens again:')
+ run_example('run_flow.py', {'BOOM': 'exit please'})
+
+ print('\nResuming all failed flows')
+ run_example('resume_all.py')
+ finally:
+ os.unlink(db_path)
+
+if __name__ == '__main__':
+ main()
diff --git a/taskflow/examples/resume_many_flows/my_flows.py b/taskflow/examples/resume_many_flows/my_flows.py
new file mode 100644
index 0000000..4cd81e6
--- /dev/null
+++ b/taskflow/examples/resume_many_flows/my_flows.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from taskflow.patterns import linear_flow as lf
+from taskflow import task
+
+
+class UnfortunateTask(task.Task):
+ def execute(self):
+ print('executing %s' % self)
+ boom = os.environ.get('BOOM')
+ if boom:
+ print('> Critical error: boom = %s' % boom)
+ raise SystemExit()
+ else:
+ print('> this time not exiting')
+
+
+class TestTask(task.Task):
+ def execute(self):
+ print('executing %s' % self)
+
+
+def flow_factory():
+ return lf.Flow('example').add(
+ TestTask(name='first'),
+ UnfortunateTask(name='boom'),
+ TestTask(name='second'))
diff --git a/taskflow/examples/resume_many_flows/my_utils.py b/taskflow/examples/resume_many_flows/my_utils.py
new file mode 100644
index 0000000..dd1fbd5
--- /dev/null
+++ b/taskflow/examples/resume_many_flows/my_utils.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from taskflow.persistence import backends
+
+
+def get_backend():
+ try:
+ backend_uri = sys.argv[1]
+ except Exception:
+ backend_uri = 'sqlite://'
+ backend = backends.fetch({'connection': backend_uri})
+ backend.get_connection().upgrade()
+ return backend
diff --git a/taskflow/examples/resume_many_flows/resume_all.py b/taskflow/examples/resume_many_flows/resume_all.py
new file mode 100644
index 0000000..82be307
--- /dev/null
+++ b/taskflow/examples/resume_many_flows/resume_all.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import sys
+
+logging.basicConfig(level=logging.ERROR)
+
+self_dir = os.path.abspath(os.path.dirname(__file__))
+top_dir = os.path.abspath(
+ os.path.join(self_dir, os.pardir, os.pardir, os.pardir))
+
+sys.path.insert(0, top_dir)
+sys.path.insert(0, self_dir)
+
+
+import taskflow.engines
+
+from taskflow import states
+
+import my_flows # noqa
+import my_utils # noqa
+
+
+FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED)
+
+
+def resume(flowdetail, backend):
+ print('Resuming flow %s %s' % (flowdetail.name, flowdetail.uuid))
+ engine = taskflow.engines.load(my_flows.flow_factory(),
+ flow_detail=flowdetail,
+ backend=backend)
+ engine.run()
+
+
+def main():
+ backend = my_utils.get_backend()
+ logbooks = list(backend.get_connection().get_logbooks())
+ for lb in logbooks:
+ for fd in lb:
+ if fd.state not in FINISHED_STATES:
+ resume(fd, backend)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/taskflow/examples/resume_many_flows/run_flow.py b/taskflow/examples/resume_many_flows/run_flow.py
new file mode 100644
index 0000000..523c984
--- /dev/null
+++ b/taskflow/examples/resume_many_flows/run_flow.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import sys
+
+logging.basicConfig(level=logging.ERROR)
+
+self_dir = os.path.abspath(os.path.dirname(__file__))
+top_dir = os.path.abspath(
+ os.path.join(self_dir, os.pardir, os.pardir, os.pardir))
+
+sys.path.insert(0, top_dir)
+sys.path.insert(0, self_dir)
+
+import taskflow.engines
+from taskflow.utils import persistence_utils as p_utils
+
+import my_flows # noqa
+import my_utils # noqa
+
+
+backend = my_utils.get_backend()
+logbook = p_utils.temporary_log_book(backend)
+
+flow = my_flows.flow_factory()
+
+flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
+engine = taskflow.engines.load(flow, flow_detail=flowdetail,
+ backend=backend)
+
+print('Running flow %s %s' % (flowdetail.name, flowdetail.uuid))
+engine.run()