diff options
-rw-r--r-- | README.md | 34 | ||||
-rw-r--r-- | kafka/__init__.py | 6 | ||||
-rw-r--r-- | kafka/client.py | 13 | ||||
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/producer.py | 207 | ||||
-rw-r--r-- | test/test_integration.py | 277 |
6 files changed, 509 insertions, 30 deletions
@@ -26,14 +26,46 @@ development, APIs are subject to change. ```python from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer +from kafka.producer import SimpleProducer, KeyedProducer kafka = KafkaClient("localhost", 9092) +# To send messages synchronously producer = SimpleProducer(kafka, "my-topic") producer.send_messages("some message") producer.send_messages("this method", "is variadic") +# To send messages asynchronously +producer = SimpleProducer(kafka, "my-topic", async=True) +producer.send_messages("async message") + +# To wait for acknowledgements +# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to +# a local log before sending response +# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed +# by all in sync replicas before sending a response +producer = SimpleProducer(kafka, "my-topic", async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + acks_timeout=2000) + +response = producer.send_messages("async message") + +if response: + print(response[0].error) + print(response[0].offset) + +# To send messages in batch. You can use any of the available +# producers for doing this. The following producer will collect +# messages in batch and send them to Kafka after 20 messages are +# collected or every 60 seconds +# Notes: +# * If the producer dies before the messages are sent, there will be losses +# * Call producer.stop() to send the messages and cleanup +producer = SimpleProducer(kafka, "my-topic", batch_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + +# To consume messages consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: print(message) diff --git a/kafka/__init__.py b/kafka/__init__.py index d229169..e352c03 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -9,10 +9,12 @@ from kafka.conn import KafkaConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) -from kafka.producer import SimpleProducer +from kafka.producer import SimpleProducer, KeyedProducer +from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer __all__ = [ - 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer', + 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', + 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/client.py b/kafka/client.py index 1146798..b3f8667 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,12 +161,16 @@ class KafkaClient(object): # Send the request, recv the response conn.send(requestId, request) + + if decoder_fn is None: + continue + response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) + return (acc[k] for k in original_keys) if acc else () ################# # Public API # @@ -201,7 +205,12 @@ class KafkaClient(object): encoder = partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) - decoder = KafkaProtocol.decode_produce_response + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] diff --git a/kafka/conn.py b/kafka/conn.py index fce1fdc..aba3ada 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -76,11 +76,11 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent != None: raise RuntimeError("Kafka went away") - self.data = self._consume_response() def recv(self, requestId): "Get a response from Kafka" log.debug("Reading response %d from Kafka" % requestId) + self.data = self._consume_response() return self.data def close(self): diff --git a/kafka/producer.py b/kafka/producer.py index 69c3830..06e468d 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,5 +1,10 @@ +from collections import defaultdict +from datetime import datetime, timedelta from itertools import cycle +from multiprocessing import Queue, Process +from Queue import Empty import logging +import sys from kafka.common import ProduceRequest from kafka.protocol import create_message @@ -7,26 +12,175 @@ from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") +BATCH_SEND_DEFAULT_INTERVAL = 20 +BATCH_SEND_MSG_COUNT = 20 -class SimpleProducer(object): +STOP_ASYNC_PRODUCER = -1 + + +class Producer(object): """ - A simple, round-robbin producer. Each message goes to exactly one partition + Base class to be used by producers + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If set to true, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic): + + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log + ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed + + DEFAULT_ACK_TIMEOUT = 1000 + + def __init__(self, client, async=False, + req_acks=ACK_AFTER_LOCAL_WRITE, + ack_timeout=DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + + if batch_send: + async = True + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + else: + batch_send_every_n = 1 + batch_send_every_t = 3600 + self.client = client + self.async = async + self.req_acks = req_acks + self.ack_timeout = ack_timeout + self.batch_send = batch_send + self.batch_size = batch_send_every_n + self.batch_time = batch_send_every_t + + if self.async: + self.queue = Queue() # Messages are sent through this queue + self.proc = Process(target=self._send_upstream, args=(self.queue,)) + self.proc.daemon = True # Process will die if main thread exits + self.proc.start() + + def _send_upstream(self, queue): + """ + Listen on the queue for a specified number of messages or till + a specified timeout and send them upstream to the brokers in one + request + """ + stop = False + + while not stop: + timeout = self.batch_time + send_at = datetime.now() + timedelta(seconds=timeout) + count = self.batch_size + msgset = defaultdict(list) + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + partition, msg = queue.get(timeout=timeout) + except Empty: + break + + # Check if the controller has requested us to stop + if partition == STOP_ASYNC_PRODUCER: + stop = True + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = (send_at - datetime.now()).total_seconds() + msgset[partition].append(msg) + + # Send collected requests upstream + reqs = [] + for partition, messages in msgset.items(): + req = ProduceRequest(self.topic, partition, messages) + reqs.append(req) + + try: + self.client.send_produce_request(reqs, acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) + + def send_messages(self, partition, *msg): + """ + Helper method to send produce requests + """ + if self.async: + for m in msg: + self.queue.put((partition, create_message(m))) + resp = [] + else: + messages = [create_message(m) for m in msg] + req = ProduceRequest(self.topic, partition, messages) + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + return resp + + def stop(self, timeout=1): + """ + Stop the producer. Optionally wait for the specified timeout before + forcefully cleaning up. + """ + if self.async: + self.queue.put((STOP_ASYNC_PRODUCER, None)) + self.proc.join(timeout) + + if self.proc.is_alive(): + self.proc.terminate() + + +class SimpleProducer(Producer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout + """ + def __init__(self, client, topic, async=False, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - self.client._load_metadata_for_topics(topic) - self.next_partition = cycle(self.client.topic_partitions[topic]) + client._load_metadata_for_topics(topic) + self.next_partition = cycle(client.topic_partitions[topic]) - def send_messages(self, *msg): - req = ProduceRequest(self.topic, self.next_partition.next(), - messages=[create_message(m) for m in msg]) + super(SimpleProducer, self).__init__(client, async, req_acks, + ack_timeout, batch_send, + batch_send_every_n, + batch_send_every_t) - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + def send_messages(self, *msg): + partition = self.next_partition.next() + return super(SimpleProducer, self).send_messages(partition, *msg) -class KeyedProducer(object): +class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key @@ -35,23 +189,34 @@ class KeyedProducer(object): topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None): - self.client = client + def __init__(self, client, topic, partitioner=None, async=False, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - self.client._load_metadata_for_topics(topic) + client._load_metadata_for_topics(topic) if not partitioner: partitioner = HashedPartitioner - self.partitioner = partitioner(self.client.topic_partitions[topic]) + self.partitioner = partitioner(client.topic_partitions[topic]) + + super(KeyedProducer, self).__init__(client, async, req_acks, + ack_timeout, batch_send, + batch_send_every_n, + batch_send_every_t) def send(self, key, msg): partitions = self.client.topic_partitions[self.topic] partition = self.partitioner.partition(key, partitions) - - req = ProduceRequest(self.topic, partition, - messages=[create_message(msg)]) - - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + return self.send_messages(partition, msg) diff --git a/test/test_integration.py b/test/test_integration.py index d607b73..6d96b6a 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,5 +1,6 @@ import logging import unittest +import time from kafka import * # noqa from kafka.common import * # noqa @@ -254,12 +255,23 @@ class TestKafkaClient(unittest.TestCase): def test_simple_producer(self): producer = SimpleProducer(self.client, "test_simple_producer") - producer.send_messages("one", "two") - producer.send_messages("three") + resp = producer.send_messages("one", "two") + + # Will go to partition 0 + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 0) # offset of first msg + + # Will go to partition 1 + resp = producer.send_messages("three") + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 0) # offset of first msg fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) - fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) self.assertEquals(fetch_resp1.error, 0) self.assertEquals(fetch_resp1.highwaterMark, 2) messages = list(fetch_resp1.messages) @@ -272,6 +284,265 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "three") + # Will go to partition 0 + resp = producer.send_messages("four", "five") + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 2) # offset of first msg + + producer.stop() + + def test_round_robin_partitioner(self): + producer = KeyedProducer(self.client, "test_round_robin_partitioner", + partitioner=RoundRobinPartitioner) + producer.send("key1", "one") + producer.send("key2", "two") + producer.send("key3", "three") + producer.send("key4", "four") + + fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024) + + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + self.assertEquals(fetch_resp1.partition, 0) + + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "three") + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 2) + self.assertEquals(fetch_resp2.partition, 1) + + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[1].message.value, "four") + + producer.stop() + + def test_hashed_partitioner(self): + producer = KeyedProducer(self.client, "test_hash_partitioner", + partitioner=HashedPartitioner) + producer.send(1, "one") + producer.send(2, "two") + producer.send(3, "three") + producer.send(4, "four") + + fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + self.assertEquals(fetch_resp1.partition, 0) + + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[1].message.value, "four") + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 2) + self.assertEquals(fetch_resp2.partition, 1) + + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "three") + + producer.stop() + + def test_acks_none(self): + producer = SimpleProducer(self.client, "test_acks_none", + req_acks=SimpleProducer.ACK_NOT_REQUIRED) + resp = producer.send_messages("one") + self.assertEquals(len(resp), 0) + + fetch = FetchRequest("test_acks_none", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_acks_local_write(self): + producer = SimpleProducer(self.client, "test_acks_local_write", + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) + resp = producer.send_messages("one") + self.assertEquals(len(resp), 1) + + fetch = FetchRequest("test_acks_local_write", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_acks_cluster_commit(self): + producer = SimpleProducer(self.client, "test_acks_cluster_commit", + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + resp = producer.send_messages("one") + self.assertEquals(len(resp), 1) + + fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_async_simple_producer(self): + producer = SimpleProducer(self.client, "test_async_simple_producer", + async=True) + + resp = producer.send_messages("one") + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_async_keyed_producer(self): + producer = KeyedProducer(self.client, "test_async_keyed_producer", + async=True) + + resp = producer.send("key1", "one") + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_batched_simple_producer(self): + producer = SimpleProducer(self.client, "test_batched_simple_producer", + batch_send=True, + batch_send_every_n=10, + batch_send_every_t=20) + + # Send 5 messages and do a fetch + msgs = ["message-%d" % i for i in range(0, 5)] + resp = producer.send_messages(*msgs) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 0) + + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 0) + + # Send 5 more messages, wait for 2 seconds and do a fetch + msgs = ["message-%d" % i for i in range(5, 10)] + resp = producer.send_messages(*msgs) + + # Give it some time + time.sleep(2) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 5) + + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 5) + + # Send 7 messages and wait for 20 seconds + msgs = ["message-%d" % i for i in range(10, 15)] + resp = producer.send_messages(*msgs) + msgs = ["message-%d" % i for i in range(15, 17)] + resp = producer.send_messages(*msgs) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) + self.assertEquals(len(messages), 0) + + # Give it some time + time.sleep(22) + + fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) + fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) + self.assertEquals(len(messages), 7) + + producer.stop() + class TestSimpleConsumer(unittest.TestCase): @classmethod |