summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-05 04:05:48 -0800
committerDana Powers <dana.powers@rd.io>2015-12-05 04:05:48 -0800
commit5682ff63534bb2579a6835d40afcad170f6bdd7c (patch)
treea6ab38737208ed5944c7839f106aadf03c13572c /kafka
parent2916bb865085ae2b883bc74dc4988b6b7723917d (diff)
downloadkafka-python-async_producer_stop.tar.gz
Producer.stop() now blocks until async thread completes (drop confusing timeout arg)async_producer_stop
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/base.py19
1 files changed, 12 insertions, 7 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index f2c7cfe..1ba4f5b 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -415,17 +415,22 @@ class Producer(object):
raise
return resp
- def stop(self, timeout=1):
+ def stop(self):
"""
- Stop the producer. Optionally wait for the specified timeout before
- forcefully cleaning up.
+ Stop the producer (async mode). Blocks until async thread completes.
"""
+ if not self.async:
+ log.warning("producer.stop() called, but producer is not async")
+ return
+
+ if self.stopped:
+ log.warning("producer.stop() called, but producer is already stopped")
+ return
+
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
- self.thread.join(timeout)
-
- if self.thread.is_alive():
- self.thread_stop_event.set()
+ self.thread_stop_event.set()
+ self.thread.join()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped