summaryrefslogtreecommitdiff
path: root/taskflow
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-12-15 15:47:58 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-12-15 15:52:39 -0800
commit97b4e18cc2b4e6c0ae7228ff70ef75dc4a5a1df7 (patch)
tree84fd5152dad6a4c480de4e435c10f9f3f3661e25 /taskflow
parent7dc11bae11e532cff5ac41c0ec942b1d4e76bc8b (diff)
downloadtaskflow-97b4e18cc2b4e6c0ae7228ff70ef75dc4a5a1df7.tar.gz
Base task executor should provide 'wait_for_any'
Instead of having each task executor reproduce the same code for 'wait_for_any' we can just have the base task implementation provide the function that everyone is replicating instead; making common code common and save the headache caused by the same code being in multiple places (which is bad for multiple reasons). Change-Id: Icea4b7e3df605ab11b17c248d05acb3f9c02a1ca
Diffstat (limited to 'taskflow')
-rw-r--r--taskflow/engines/action_engine/executor.py8
-rw-r--r--taskflow/engines/worker_based/executor.py5
-rw-r--r--taskflow/tests/unit/worker_based/test_executor.py6
3 files changed, 4 insertions, 15 deletions
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py
index b7ff2f7..002068b 100644
--- a/taskflow/engines/action_engine/executor.py
+++ b/taskflow/engines/action_engine/executor.py
@@ -83,9 +83,9 @@ class TaskExecutor(object):
progress_callback=None):
"""Schedules task reversion."""
- @abc.abstractmethod
def wait_for_any(self, fs, timeout=None):
"""Wait for futures returned by this executor to complete."""
+ return async_utils.wait_for_any(fs, timeout=timeout)
def start(self):
"""Prepare to execute tasks."""
@@ -117,9 +117,6 @@ class SerialTaskExecutor(TaskExecutor):
fut.atom = task
return fut
- def wait_for_any(self, fs, timeout=None):
- return async_utils.wait_for_any(fs, timeout)
-
class ParallelTaskExecutor(TaskExecutor):
"""Executes tasks in parallel.
@@ -148,9 +145,6 @@ class ParallelTaskExecutor(TaskExecutor):
fut.atom = task
return fut
- def wait_for_any(self, fs, timeout=None):
- return async_utils.wait_for_any(fs, timeout)
-
def start(self):
if self._create_executor:
if self._max_workers is not None:
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index cd068e8..bdef7bf 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -27,7 +27,6 @@ from taskflow import exceptions as exc
from taskflow import logging
from taskflow import task as task_atom
from taskflow.types import timing as tt
-from taskflow.utils import async_utils
from taskflow.utils import misc
from taskflow.utils import reflection
from taskflow.utils import threading_utils as tu
@@ -248,10 +247,6 @@ class WorkerTaskExecutor(executor.TaskExecutor):
progress_callback, result=result,
failures=failures)
- def wait_for_any(self, fs, timeout=None):
- """Wait for futures returned by this executor to complete."""
- return async_utils.wait_for_any(fs, timeout)
-
def wait_for_workers(self, workers=1, timeout=None):
"""Waits for geq workers to notify they are ready to do work.
diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py
index 5583421..f075e2f 100644
--- a/taskflow/tests/unit/worker_based/test_executor.py
+++ b/taskflow/tests/unit/worker_based/test_executor.py
@@ -58,7 +58,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.request_inst_mock.expired = False
self.request_inst_mock.task_cls = self.task.name
self.wait_for_any_mock = self.patch(
- 'taskflow.engines.worker_based.executor.async_utils.wait_for_any')
+ 'taskflow.engines.action_engine.executor.async_utils.wait_for_any')
self.message_mock = mock.MagicMock(name='message')
self.message_mock.properties = {'correlation_id': self.task_uuid,
'type': pr.RESPONSE}
@@ -289,7 +289,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
ex.wait_for_any(fs)
expected_calls = [
- mock.call(fs, None)
+ mock.call(fs, timeout=None)
]
self.assertEqual(self.wait_for_any_mock.mock_calls, expected_calls)
@@ -300,7 +300,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
ex.wait_for_any(fs, timeout)
master_mock_calls = [
- mock.call(fs, timeout)
+ mock.call(fs, timeout=timeout)
]
self.assertEqual(self.wait_for_any_mock.mock_calls, master_mock_calls)