diff options
| author | Joshua Harlow <harlowja@gmail.com> | 2015-01-24 00:45:36 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@gmail.com> | 2015-01-24 18:33:51 -0800 |
| commit | ca82e20efe8f5c5d50b3db89be0342710ef7f73b (patch) | |
| tree | e4682847164001d229d70f1ecc94739fd80958af /taskflow/engines/worker_based/executor.py | |
| parent | 1ae7a8e67b79f1ea7533525ef27271978365afe9 (diff) | |
| download | taskflow-ca82e20efe8f5c5d50b3db89be0342710ef7f73b.tar.gz | |
Add a thread bundle helper utility + tests
To make it easier to create a bunch of threads in a single
call (and stop them in a single call) create a concept of
a thread bundle (similar to a thread group) that will call
into a provided set of factories to get a thread, activate
callbacks to notify others that a thread is about to start
or stop and then perform the start or stop of the bound
threads in a orderly manner.
Change-Id: I7d233cccb230b716af41243ad27220b988eec14c
Diffstat (limited to 'taskflow/engines/worker_based/executor.py')
| -rw-r--r-- | taskflow/engines/worker_based/executor.py | 28 |
1 files changed, 10 insertions, 18 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index cda3745..8290ba6 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -59,10 +59,16 @@ class WorkerTaskExecutor(executor.TaskExecutor): transport=transport, transport_options=transport_options, retry_options=retry_options) - self._proxy_thread = None self._periodic = wt.PeriodicWorker(tt.Timeout(pr.NOTIFY_PERIOD), [self._notify_topics]) - self._periodic_thread = None + self._helpers = tu.ThreadBundle() + self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start), + after_start=lambda t: self._proxy.wait(), + before_join=lambda t: self._proxy.stop()) + self._helpers.bind(lambda: tu.daemon_thread(self._periodic.start), + before_join=lambda t: self._periodic.stop(), + after_join=lambda t: self._periodic.reset(), + before_start=lambda t: self._periodic.reset()) def _process_notify(self, notify, message): """Process notify message from remote side.""" @@ -226,24 +232,10 @@ class WorkerTaskExecutor(executor.TaskExecutor): def start(self): """Starts proxy thread and associated topic notification thread.""" - if not tu.is_alive(self._proxy_thread): - self._proxy_thread = tu.daemon_thread(self._proxy.start) - self._proxy_thread.start() - self._proxy.wait() - if not tu.is_alive(self._periodic_thread): - self._periodic.reset() - self._periodic_thread = tu.daemon_thread(self._periodic.start) - self._periodic_thread.start() + self._helpers.start() def stop(self): """Stops proxy thread and associated topic notification thread.""" - if self._periodic_thread is not None: - self._periodic.stop() - self._periodic_thread.join() - self._periodic_thread = None - if self._proxy_thread is not None: - self._proxy.stop() - self._proxy_thread.join() - self._proxy_thread = None + self._helpers.stop() self._requests_cache.clear(self._handle_expired_request) self._workers.clear() |
