diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-05 04:05:48 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-05 04:05:48 -0800 |
commit | 5682ff63534bb2579a6835d40afcad170f6bdd7c (patch) | |
tree | a6ab38737208ed5944c7839f106aadf03c13572c /kafka/producer/base.py | |
parent | 2916bb865085ae2b883bc74dc4988b6b7723917d (diff) | |
download | kafka-python-async_producer_stop.tar.gz |
Producer.stop() now blocks until async thread completes (drop confusing timeout arg)async_producer_stop
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r-- | kafka/producer/base.py | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index f2c7cfe..1ba4f5b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -415,17 +415,22 @@ class Producer(object): raise return resp - def stop(self, timeout=1): + def stop(self): """ - Stop the producer. Optionally wait for the specified timeout before - forcefully cleaning up. + Stop the producer (async mode). Blocks until async thread completes. """ + if not self.async: + log.warning("producer.stop() called, but producer is not async") + return + + if self.stopped: + log.warning("producer.stop() called, but producer is already stopped") + return + if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) - self.thread.join(timeout) - - if self.thread.is_alive(): - self.thread_stop_event.set() + self.thread_stop_event.set() + self.thread.join() if hasattr(self, '_cleanup_func'): # Remove cleanup handler now that we've stopped |