summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py9
1 files changed, 7 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index a0d9ac1..0e005c5 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -31,6 +31,8 @@ BATCH_SEND_MSG_COUNT = 20
BATCH_RETRY_OPTIONS = RetryOptions(
limit=0, backoff_ms=300, retry_on_timeouts=False)
+# unlimited
+ASYNC_QUEUE_MAXSIZE = 0
STOP_ASYNC_PRODUCER = -1
@@ -159,12 +161,14 @@ class Producer(object):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- batch_retry_options=BATCH_RETRY_OPTIONS):
+ batch_retry_options=BATCH_RETRY_OPTIONS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE):
if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
+ assert async_queue_maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600
@@ -186,7 +190,8 @@ class Producer(object):
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
- self.queue = Queue() # Messages are sent through this queue
+ # Messages are sent through this queue
+ self.queue = Queue(async_queue_maxsize)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,