diff options
author | David Arthur <mumrah@gmail.com> | 2013-07-11 05:58:54 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-07-11 05:58:54 -0700 |
commit | 5684af438e6cf871540aa8ea8b556737f56e9798 (patch) | |
tree | f9bb1f6046943946236ceae61d9266a8c14bdcfe /kafka | |
parent | ffdc08aeec040862d522914a480c135626a19e69 (diff) | |
parent | d2df8f54637490b1dbe858066b74710b57186016 (diff) | |
download | kafka-python-5684af438e6cf871540aa8ea8b556737f56e9798.tar.gz |
Merge pull request #33 from mahendra/asyncproducer
Support for async producer
Merged locally, tests pass, +1
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/__init__.py | 6 | ||||
-rw-r--r-- | kafka/client.py | 13 | ||||
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/producer.py | 207 |
4 files changed, 202 insertions, 26 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index d229169..e352c03 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -9,10 +9,12 @@ from kafka.conn import KafkaConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) -from kafka.producer import SimpleProducer +from kafka.producer import SimpleProducer, KeyedProducer +from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer __all__ = [ - 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer', + 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', + 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/client.py b/kafka/client.py index 1146798..b3f8667 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,12 +161,16 @@ class KafkaClient(object): # Send the request, recv the response conn.send(requestId, request) + + if decoder_fn is None: + continue + response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) + return (acc[k] for k in original_keys) if acc else () ################# # Public API # @@ -201,7 +205,12 @@ class KafkaClient(object): encoder = partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) - decoder = KafkaProtocol.decode_produce_response + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] diff --git a/kafka/conn.py b/kafka/conn.py index fce1fdc..aba3ada 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -76,11 +76,11 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent != None: raise RuntimeError("Kafka went away") - self.data = self._consume_response() def recv(self, requestId): "Get a response from Kafka" log.debug("Reading response %d from Kafka" % requestId) + self.data = self._consume_response() return self.data def close(self): diff --git a/kafka/producer.py b/kafka/producer.py index 69c3830..06e468d 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,5 +1,10 @@ +from collections import defaultdict +from datetime import datetime, timedelta from itertools import cycle +from multiprocessing import Queue, Process +from Queue import Empty import logging +import sys from kafka.common import ProduceRequest from kafka.protocol import create_message @@ -7,26 +12,175 @@ from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") +BATCH_SEND_DEFAULT_INTERVAL = 20 +BATCH_SEND_MSG_COUNT = 20 -class SimpleProducer(object): +STOP_ASYNC_PRODUCER = -1 + + +class Producer(object): """ - A simple, round-robbin producer. Each message goes to exactly one partition + Base class to be used by producers + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If set to true, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic): + + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log + ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed + + DEFAULT_ACK_TIMEOUT = 1000 + + def __init__(self, client, async=False, + req_acks=ACK_AFTER_LOCAL_WRITE, + ack_timeout=DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + + if batch_send: + async = True + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + else: + batch_send_every_n = 1 + batch_send_every_t = 3600 + self.client = client + self.async = async + self.req_acks = req_acks + self.ack_timeout = ack_timeout + self.batch_send = batch_send + self.batch_size = batch_send_every_n + self.batch_time = batch_send_every_t + + if self.async: + self.queue = Queue() # Messages are sent through this queue + self.proc = Process(target=self._send_upstream, args=(self.queue,)) + self.proc.daemon = True # Process will die if main thread exits + self.proc.start() + + def _send_upstream(self, queue): + """ + Listen on the queue for a specified number of messages or till + a specified timeout and send them upstream to the brokers in one + request + """ + stop = False + + while not stop: + timeout = self.batch_time + send_at = datetime.now() + timedelta(seconds=timeout) + count = self.batch_size + msgset = defaultdict(list) + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + partition, msg = queue.get(timeout=timeout) + except Empty: + break + + # Check if the controller has requested us to stop + if partition == STOP_ASYNC_PRODUCER: + stop = True + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = (send_at - datetime.now()).total_seconds() + msgset[partition].append(msg) + + # Send collected requests upstream + reqs = [] + for partition, messages in msgset.items(): + req = ProduceRequest(self.topic, partition, messages) + reqs.append(req) + + try: + self.client.send_produce_request(reqs, acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) + + def send_messages(self, partition, *msg): + """ + Helper method to send produce requests + """ + if self.async: + for m in msg: + self.queue.put((partition, create_message(m))) + resp = [] + else: + messages = [create_message(m) for m in msg] + req = ProduceRequest(self.topic, partition, messages) + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + return resp + + def stop(self, timeout=1): + """ + Stop the producer. Optionally wait for the specified timeout before + forcefully cleaning up. + """ + if self.async: + self.queue.put((STOP_ASYNC_PRODUCER, None)) + self.proc.join(timeout) + + if self.proc.is_alive(): + self.proc.terminate() + + +class SimpleProducer(Producer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout + """ + def __init__(self, client, topic, async=False, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - self.client._load_metadata_for_topics(topic) - self.next_partition = cycle(self.client.topic_partitions[topic]) + client._load_metadata_for_topics(topic) + self.next_partition = cycle(client.topic_partitions[topic]) - def send_messages(self, *msg): - req = ProduceRequest(self.topic, self.next_partition.next(), - messages=[create_message(m) for m in msg]) + super(SimpleProducer, self).__init__(client, async, req_acks, + ack_timeout, batch_send, + batch_send_every_n, + batch_send_every_t) - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + def send_messages(self, *msg): + partition = self.next_partition.next() + return super(SimpleProducer, self).send_messages(partition, *msg) -class KeyedProducer(object): +class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key @@ -35,23 +189,34 @@ class KeyedProducer(object): topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None): - self.client = client + def __init__(self, client, topic, partitioner=None, async=False, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - self.client._load_metadata_for_topics(topic) + client._load_metadata_for_topics(topic) if not partitioner: partitioner = HashedPartitioner - self.partitioner = partitioner(self.client.topic_partitions[topic]) + self.partitioner = partitioner(client.topic_partitions[topic]) + + super(KeyedProducer, self).__init__(client, async, req_acks, + ack_timeout, batch_send, + batch_send_every_n, + batch_send_every_t) def send(self, key, msg): partitions = self.client.topic_partitions[self.topic] partition = self.partitioner.partition(key, partitions) - - req = ProduceRequest(self.topic, partition, - messages=[create_message(msg)]) - - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + return self.send_messages(partition, msg) |