diff options
Diffstat (limited to 'taskflow')
| -rw-r--r-- | taskflow/engines/action_engine/executor.py | 8 | ||||
| -rw-r--r-- | taskflow/engines/worker_based/executor.py | 5 | ||||
| -rw-r--r-- | taskflow/tests/unit/worker_based/test_executor.py | 6 |
3 files changed, 4 insertions, 15 deletions
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index b7ff2f7..002068b 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -83,9 +83,9 @@ class TaskExecutor(object): progress_callback=None): """Schedules task reversion.""" - @abc.abstractmethod def wait_for_any(self, fs, timeout=None): """Wait for futures returned by this executor to complete.""" + return async_utils.wait_for_any(fs, timeout=timeout) def start(self): """Prepare to execute tasks.""" @@ -117,9 +117,6 @@ class SerialTaskExecutor(TaskExecutor): fut.atom = task return fut - def wait_for_any(self, fs, timeout=None): - return async_utils.wait_for_any(fs, timeout) - class ParallelTaskExecutor(TaskExecutor): """Executes tasks in parallel. @@ -148,9 +145,6 @@ class ParallelTaskExecutor(TaskExecutor): fut.atom = task return fut - def wait_for_any(self, fs, timeout=None): - return async_utils.wait_for_any(fs, timeout) - def start(self): if self._create_executor: if self._max_workers is not None: diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index cd068e8..bdef7bf 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -27,7 +27,6 @@ from taskflow import exceptions as exc from taskflow import logging from taskflow import task as task_atom from taskflow.types import timing as tt -from taskflow.utils import async_utils from taskflow.utils import misc from taskflow.utils import reflection from taskflow.utils import threading_utils as tu @@ -248,10 +247,6 @@ class WorkerTaskExecutor(executor.TaskExecutor): progress_callback, result=result, failures=failures) - def wait_for_any(self, fs, timeout=None): - """Wait for futures returned by this executor to complete.""" - return async_utils.wait_for_any(fs, timeout) - def wait_for_workers(self, workers=1, timeout=None): """Waits for geq workers to notify they are ready to do work. diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 5583421..f075e2f 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -58,7 +58,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.request_inst_mock.expired = False self.request_inst_mock.task_cls = self.task.name self.wait_for_any_mock = self.patch( - 'taskflow.engines.worker_based.executor.async_utils.wait_for_any') + 'taskflow.engines.action_engine.executor.async_utils.wait_for_any') self.message_mock = mock.MagicMock(name='message') self.message_mock.properties = {'correlation_id': self.task_uuid, 'type': pr.RESPONSE} @@ -289,7 +289,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.wait_for_any(fs) expected_calls = [ - mock.call(fs, None) + mock.call(fs, timeout=None) ] self.assertEqual(self.wait_for_any_mock.mock_calls, expected_calls) @@ -300,7 +300,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.wait_for_any(fs, timeout) master_mock_calls = [ - mock.call(fs, timeout) + mock.call(fs, timeout=timeout) ] self.assertEqual(self.wait_for_any_mock.mock_calls, master_mock_calls) |
