summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py19
1 files changed, 12 insertions, 7 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index f2c7cfe..1ba4f5b 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -415,17 +415,22 @@ class Producer(object):
raise
return resp
- def stop(self, timeout=1):
+ def stop(self):
"""
- Stop the producer. Optionally wait for the specified timeout before
- forcefully cleaning up.
+ Stop the producer (async mode). Blocks until async thread completes.
"""
+ if not self.async:
+ log.warning("producer.stop() called, but producer is not async")
+ return
+
+ if self.stopped:
+ log.warning("producer.stop() called, but producer is already stopped")
+ return
+
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
- self.thread.join(timeout)
-
- if self.thread.is_alive():
- self.thread_stop_event.set()
+ self.thread_stop_event.set()
+ self.thread.join()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped