summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-26 11:41:05 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-26 11:41:05 +0530
commitefb1dabd1343de3be3371720244d9c1300951bfd (patch)
treec9c2c648065b7b15f99ea1583d32d49ed1ce12f3
parentec251efc56ecf281897a14321270700ab1874202 (diff)
downloadkafka-python-efb1dabd1343de3be3371720244d9c1300951bfd.tar.gz
Add support for batched message send
Also improve on the logic for stopping the async Processor instance. Ensure that unsend messages are sent before it is stopped.
-rw-r--r--kafka/producer.py109
1 files changed, 100 insertions, 9 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 9ed0056..da7cd96 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,6 +1,9 @@
+from datetime import datetime
from itertools import cycle
from multiprocessing import Queue, Process
+from Queue import Empty
import logging
+import sys
from kafka.common import ProduceRequest
from kafka.protocol import create_message
@@ -8,6 +11,11 @@ from kafka.partitioner import HashedPartitioner
log = logging.getLogger("kafka")
+BATCH_SEND_DEFAULT_INTERVAL = 20
+BATCH_SEND_MSG_COUNT = 20
+
+STOP_ASYNC_PRODUCER = -1
+
class Producer(object):
"""
@@ -22,6 +30,9 @@ class Producer(object):
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
ACK_NOT_REQUIRED = 0 # No ack is required
@@ -30,28 +41,95 @@ class Producer(object):
DEFAULT_ACK_TIMEOUT = 1000
- def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE,
- ack_timeout=DEFAULT_ACK_TIMEOUT):
+ def __init__(self, client, async=False,
+ req_acks=ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+
+ if batch_send:
+ async = True
+ assert batch_send_every_n > 0
+ assert batch_send_every_t > 0
+ handler = self._send_upstream_batched
+ else:
+ handler = self._send_upstream
+
self.client = client
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
+ self.batch_send = batch_send
+ self.batch_size = batch_send_every_n
+ self.batch_time = batch_send_every_t
if self.async:
self.queue = Queue() # Messages are sent through this queue
- self.proc = Process(target=self._send_upstream, args=(self.queue,))
+ self.proc = Process(target=handler, args=(self.queue,))
self.proc.daemon = True # Process will die if main thread exits
self.proc.start()
+ def _send_async_requests(self, reqs):
+ """
+ Send a bunch of requests upstream and log any errors
+ """
+ if not reqs:
+ return
+
+ # Ignore any acks in the async mode and just log exceptions
+ try:
+ self.client.send_produce_request(reqs, acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as exp:
+ log.error("Error sending message", exc_info=sys.exc_info())
+
def _send_upstream(self, queue):
"""
Listen on the queue for messages and send them upstream to the brokers
"""
while True:
req = queue.get()
- # Ignore any acks in the async mode
- self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+
+ if req == STOP_ASYNC_PRODUCER:
+ log.info("Stopping async producer")
+ break
+
+ self._send_async_requests([req])
+
+ def _send_upstream_batches(self, queue):
+ """
+ Listen on the queue for a specified number of messages or till
+ a specified timeout and send them upstream to the brokers in one
+ request
+ """
+ stop = False
+
+ while not stop:
+ timeout = self.batch_time
+ stop = datetime.now() + timedelta(seconds=timeout)
+ count = self.batch_size
+ reqs = []
+
+ # Keep fetching till we gather enough messages or a
+ # timeout is reached
+ while count > 0 and timeout >= 0:
+ try:
+ req = queue.get(timeout)
+
+ if req == STOP_ASYNC_PRODUCER:
+ stop = True
+ break
+ reqs.append(req)
+ except Empty:
+ break
+
+ # Adjust the timeout to match the remaining period
+ count -= 1
+ timeout = (stop - datetime.now()).total_seconds()
+
+ # Send collected requests upstream
+ self._send_async_requests(reqs)
def send_request(self, req):
"""
@@ -65,10 +143,17 @@ class Producer(object):
timeout=self.ack_timeout)
return resp
- def stop(self):
+ def stop(self, timeout=1):
+ """
+ Stop the producer. Optionally wait for the specified timeout before
+ forcefully cleaning up.
+ """
if self.async:
- self.proc.terminate()
- self.proc.join()
+ self.queue.put(STOP_ASYNC_PRODUCER)
+ self.proc.join(timeout)
+
+ if self.proc.is_alive()
+ self.proc.terminate()
class SimpleProducer(Producer):
@@ -84,6 +169,9 @@ class SimpleProducer(Producer):
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
def __init__(self, client, topic, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
@@ -114,6 +202,9 @@ class KeyedProducer(Producer):
thread (process). We will not wait for a response to these
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
def __init__(self, client, topic, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,