diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-26 11:41:05 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-26 11:41:05 +0530 |
commit | efb1dabd1343de3be3371720244d9c1300951bfd (patch) | |
tree | c9c2c648065b7b15f99ea1583d32d49ed1ce12f3 | |
parent | ec251efc56ecf281897a14321270700ab1874202 (diff) | |
download | kafka-python-efb1dabd1343de3be3371720244d9c1300951bfd.tar.gz |
Add support for batched message send
Also improve on the logic for stopping the async Processor instance.
Ensure that unsend messages are sent before it is stopped.
-rw-r--r-- | kafka/producer.py | 109 |
1 files changed, 100 insertions, 9 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 9ed0056..da7cd96 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,6 +1,9 @@ +from datetime import datetime from itertools import cycle from multiprocessing import Queue, Process +from Queue import Empty import logging +import sys from kafka.common import ProduceRequest from kafka.protocol import create_message @@ -8,6 +11,11 @@ from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") +BATCH_SEND_DEFAULT_INTERVAL = 20 +BATCH_SEND_MSG_COUNT = 20 + +STOP_ASYNC_PRODUCER = -1 + class Producer(object): """ @@ -22,6 +30,9 @@ class Producer(object): receive before responding to the request ack_timeout - Value (in milliseconds) indicating a timeout for waiting for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ ACK_NOT_REQUIRED = 0 # No ack is required @@ -30,28 +41,95 @@ class Producer(object): DEFAULT_ACK_TIMEOUT = 1000 - def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE, - ack_timeout=DEFAULT_ACK_TIMEOUT): + def __init__(self, client, async=False, + req_acks=ACK_AFTER_LOCAL_WRITE, + ack_timeout=DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + + if batch_send: + async = True + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + handler = self._send_upstream_batched + else: + handler = self._send_upstream + self.client = client self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout + self.batch_send = batch_send + self.batch_size = batch_send_every_n + self.batch_time = batch_send_every_t if self.async: self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=self._send_upstream, args=(self.queue,)) + self.proc = Process(target=handler, args=(self.queue,)) self.proc.daemon = True # Process will die if main thread exits self.proc.start() + def _send_async_requests(self, reqs): + """ + Send a bunch of requests upstream and log any errors + """ + if not reqs: + return + + # Ignore any acks in the async mode and just log exceptions + try: + self.client.send_produce_request(reqs, acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) + def _send_upstream(self, queue): """ Listen on the queue for messages and send them upstream to the brokers """ while True: req = queue.get() - # Ignore any acks in the async mode - self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + + if req == STOP_ASYNC_PRODUCER: + log.info("Stopping async producer") + break + + self._send_async_requests([req]) + + def _send_upstream_batches(self, queue): + """ + Listen on the queue for a specified number of messages or till + a specified timeout and send them upstream to the brokers in one + request + """ + stop = False + + while not stop: + timeout = self.batch_time + stop = datetime.now() + timedelta(seconds=timeout) + count = self.batch_size + reqs = [] + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + req = queue.get(timeout) + + if req == STOP_ASYNC_PRODUCER: + stop = True + break + reqs.append(req) + except Empty: + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = (stop - datetime.now()).total_seconds() + + # Send collected requests upstream + self._send_async_requests(reqs) def send_request(self, req): """ @@ -65,10 +143,17 @@ class Producer(object): timeout=self.ack_timeout) return resp - def stop(self): + def stop(self, timeout=1): + """ + Stop the producer. Optionally wait for the specified timeout before + forcefully cleaning up. + """ if self.async: - self.proc.terminate() - self.proc.join() + self.queue.put(STOP_ASYNC_PRODUCER) + self.proc.join(timeout) + + if self.proc.is_alive() + self.proc.terminate() class SimpleProducer(Producer): @@ -84,6 +169,9 @@ class SimpleProducer(Producer): receive before responding to the request ack_timeout - Value (in milliseconds) indicating a timeout for waiting for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ def __init__(self, client, topic, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, @@ -114,6 +202,9 @@ class KeyedProducer(Producer): thread (process). We will not wait for a response to these ack_timeout - Value (in milliseconds) indicating a timeout for waiting for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ def __init__(self, client, topic, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, |