summaryrefslogtreecommitdiff
path: root/taskflow/engines/worker_based/executor.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2015-01-24 00:45:36 -0800
committerJoshua Harlow <harlowja@gmail.com>2015-01-24 18:33:51 -0800
commitca82e20efe8f5c5d50b3db89be0342710ef7f73b (patch)
treee4682847164001d229d70f1ecc94739fd80958af /taskflow/engines/worker_based/executor.py
parent1ae7a8e67b79f1ea7533525ef27271978365afe9 (diff)
downloadtaskflow-ca82e20efe8f5c5d50b3db89be0342710ef7f73b.tar.gz
Add a thread bundle helper utility + tests
To make it easier to create a bunch of threads in a single call (and stop them in a single call) create a concept of a thread bundle (similar to a thread group) that will call into a provided set of factories to get a thread, activate callbacks to notify others that a thread is about to start or stop and then perform the start or stop of the bound threads in a orderly manner. Change-Id: I7d233cccb230b716af41243ad27220b988eec14c
Diffstat (limited to 'taskflow/engines/worker_based/executor.py')
-rw-r--r--taskflow/engines/worker_based/executor.py28
1 files changed, 10 insertions, 18 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index cda3745..8290ba6 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -59,10 +59,16 @@ class WorkerTaskExecutor(executor.TaskExecutor):
transport=transport,
transport_options=transport_options,
retry_options=retry_options)
- self._proxy_thread = None
self._periodic = wt.PeriodicWorker(tt.Timeout(pr.NOTIFY_PERIOD),
[self._notify_topics])
- self._periodic_thread = None
+ self._helpers = tu.ThreadBundle()
+ self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
+ after_start=lambda t: self._proxy.wait(),
+ before_join=lambda t: self._proxy.stop())
+ self._helpers.bind(lambda: tu.daemon_thread(self._periodic.start),
+ before_join=lambda t: self._periodic.stop(),
+ after_join=lambda t: self._periodic.reset(),
+ before_start=lambda t: self._periodic.reset())
def _process_notify(self, notify, message):
"""Process notify message from remote side."""
@@ -226,24 +232,10 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def start(self):
"""Starts proxy thread and associated topic notification thread."""
- if not tu.is_alive(self._proxy_thread):
- self._proxy_thread = tu.daemon_thread(self._proxy.start)
- self._proxy_thread.start()
- self._proxy.wait()
- if not tu.is_alive(self._periodic_thread):
- self._periodic.reset()
- self._periodic_thread = tu.daemon_thread(self._periodic.start)
- self._periodic_thread.start()
+ self._helpers.start()
def stop(self):
"""Stops proxy thread and associated topic notification thread."""
- if self._periodic_thread is not None:
- self._periodic.stop()
- self._periodic_thread.join()
- self._periodic_thread = None
- if self._proxy_thread is not None:
- self._proxy.stop()
- self._proxy_thread.join()
- self._proxy_thread = None
+ self._helpers.stop()
self._requests_cache.clear(self._handle_expired_request)
self._workers.clear()