diff options
-rw-r--r-- | kafka/producer/base.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index bb7fd43..cf5abac 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -9,7 +9,7 @@ except ImportError: from Queue import Empty, Queue from collections import defaultdict -from threading import Thread +from threading import Thread, Event import six @@ -27,7 +27,7 @@ STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout): + req_acks, ack_timeout, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -35,7 +35,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, """ stop = False - while not stop: + while not stop_event.is_set(): timeout = batch_time count = batch_size send_at = time.time() + timeout @@ -52,7 +52,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Check if the controller has requested us to stop if topic_partition == STOP_ASYNC_PRODUCER: - stop = True + stop_event.set() break # Adjust the timeout to match the remaining period @@ -136,6 +136,7 @@ class Producer(object): log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue + self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -143,7 +144,8 @@ class Producer(object): batch_send_every_t, batch_send_every_n, self.req_acks, - self.ack_timeout)) + self.ack_timeout, + self.thread_stop_event)) # Thread will die if main thread exits self.thread.daemon = True @@ -207,3 +209,6 @@ class Producer(object): if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread.join(timeout) + + if self.thread.is_alive(): + self.thread_stop_event.set() |