summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-02-26 13:02:48 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-02-26 13:02:48 +0300
commitaedbbb39be4d207ba9eaf99811980276d44f39a5 (patch)
tree0da41526d31667325117744e8f8fb957a446d46f
parent1cce28715798cc5dbe84c793e306cd15769afd7c (diff)
downloadkafka-python-aedbbb39be4d207ba9eaf99811980276d44f39a5.tar.gz
Using threading.Event to stop async producer thread
-rw-r--r--kafka/producer/base.py15
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index bb7fd43..cf5abac 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -9,7 +9,7 @@ except ImportError:
from Queue import Empty, Queue
from collections import defaultdict
-from threading import Thread
+from threading import Thread, Event
import six
@@ -27,7 +27,7 @@ STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout):
+ req_acks, ack_timeout, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
@@ -35,7 +35,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
"""
stop = False
- while not stop:
+ while not stop_event.is_set():
timeout = batch_time
count = batch_size
send_at = time.time() + timeout
@@ -52,7 +52,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Check if the controller has requested us to stop
if topic_partition == STOP_ASYNC_PRODUCER:
- stop = True
+ stop_event.set()
break
# Adjust the timeout to match the remaining period
@@ -136,6 +136,7 @@ class Producer(object):
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
+ self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
self.client.copy(),
@@ -143,7 +144,8 @@ class Producer(object):
batch_send_every_t,
batch_send_every_n,
self.req_acks,
- self.ack_timeout))
+ self.ack_timeout,
+ self.thread_stop_event))
# Thread will die if main thread exits
self.thread.daemon = True
@@ -207,3 +209,6 @@ class Producer(object):
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.thread.join(timeout)
+
+ if self.thread.is_alive():
+ self.thread_stop_event.set()