diff options
-rw-r--r-- | README.md | 26 | ||||
-rw-r--r-- | kafka/producer.py | 79 | ||||
-rw-r--r-- | test/test_integration.py | 82 |
3 files changed, 98 insertions, 89 deletions
@@ -17,7 +17,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE` # Status -The current version of this package is **0.9.0** and is compatible with +The current version of this package is **0.9.0** and is compatible with Kafka brokers running version **0.8.1**. # Usage @@ -32,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer kafka = KafkaClient("localhost", 9092) # To send messages synchronously -producer = SimpleProducer(kafka, "my-topic") -producer.send_messages("some message") -producer.send_messages("this method", "is variadic") +producer = SimpleProducer(kafka) +producer.send_messages("my-topic", "some message") +producer.send_messages("my-topic", "this method", "is variadic") # To send messages asynchronously -producer = SimpleProducer(kafka, "my-topic", async=True) -producer.send_messages("async message") +producer = SimpleProducer(kafka, async=True) +producer.send_messages("my-topic", "async message") # To wait for acknowledgements # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed # by all in sync replicas before sending a response -producer = SimpleProducer(kafka, "my-topic", async=False, +producer = SimpleProducer(kafka, async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) -response = producer.send_messages("async message") +response = producer.send_messages("my-topic", "async message") if response: print(response[0].error) @@ -62,7 +62,7 @@ if response: # Notes: # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup -producer = SimpleProducer(kafka, "my-topic", batch_send=True, +producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=20, batch_send_every_t=60) @@ -83,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost", 9092) # HashedPartitioner is default -producer = KeyedProducer(kafka, "my-topic") -producer.send("key1", "some message") -producer.send("key2", "this methode") +producer = KeyedProducer(kafka) +producer.send("my-topic", "key1", "some message") +producer.send("my-topic", "key2", "this methode") -producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) +producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) ``` ## Multiprocess consumer diff --git a/kafka/producer.py b/kafka/producer.py index 6b624f2..12a2934 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -8,7 +8,7 @@ from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from kafka.common import ProduceRequest +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner from kafka.protocol import create_message @@ -20,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20 STOP_ASYNC_PRODUCER = -1 -def _send_upstream(topic, queue, client, batch_time, batch_size, +def _send_upstream(queue, client, batch_time, batch_size, req_acks, ack_timeout): """ Listen on the queue for a specified number of messages or till @@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - partition, msg = queue.get(timeout=timeout) + topic_partition, msg = queue.get(timeout=timeout) + except Empty: break # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: + if topic_partition == STOP_ASYNC_PRODUCER: stop = True break # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[partition].append(msg) + msgset[topic_partition].append(msg) # Send collected requests upstream reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(topic, partition, messages) + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) reqs.append(req) try: @@ -78,7 +81,6 @@ class Producer(object): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -119,8 +121,7 @@ class Producer(object): if self.async: self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, - args=(self.topic, - self.queue, + args=(self.queue, self.client.copy(), batch_send_every_t, batch_send_every_n, @@ -131,17 +132,18 @@ class Producer(object): self.proc.daemon = True self.proc.start() - def send_messages(self, partition, *msg): + def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests """ if self.async: for m in msg: - self.queue.put((partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] - req = ProduceRequest(self.topic, partition, messages) + req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) @@ -169,7 +171,6 @@ class SimpleProducer(Producer): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -180,27 +181,31 @@ class SimpleProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, async=False, + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client.load_metadata_for_topics(topic) - self.next_partition = cycle(client.topic_partitions[topic]) - + self.partition_cycles = {} super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send_messages(self, *msg): - partition = self.next_partition.next() - return super(SimpleProducer, self).send_messages(partition, *msg) + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + return self.partition_cycles[topic].next() + + def send_messages(self, topic, *msg): + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages(topic, partition, *msg) def __repr__(self): - return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async) + return '<SimpleProducer batch=%s>' % self.async class KeyedProducer(Producer): @@ -209,7 +214,6 @@ class KeyedProducer(Producer): Args: client - The kafka client instance - topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner async - If True, the messages are sent asynchronously via another @@ -220,29 +224,34 @@ class KeyedProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None, async=False, + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client.load_metadata_for_topics(topic) - if not partitioner: partitioner = HashedPartitioner - - self.partitioner = partitioner(client.topic_partitions[topic]) + self.partitioner_class = partitioner + self.partitioners = {} super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send(self, key, msg): - partitions = self.client.topic_partitions[self.topic] - partition = self.partitioner.partition(key, partitions) - return self.send_messages(partition, msg) + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partitioners[topic] = \ + self.partitioner_class(self.client.topic_partitions[topic]) + partitioner = self.partitioners[topic] + return partitioner.partition(key, self.client.topic_partitions[topic]) + + def send(self, topic, key, msg): + partition = self._next_partition(topic, key) + return self.send_messages(topic, partition, msg) def __repr__(self): - return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async) + return '<KeyedProducer batch=%s>' % self.async diff --git a/test/test_integration.py b/test/test_integration.py index 5a22630..d0da523 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -284,8 +284,8 @@ class TestKafkaClient(KafkaTestCase): # Producer Tests def test_simple_producer(self): - producer = SimpleProducer(self.client, self.topic) - resp = producer.send_messages("one", "two") + producer = SimpleProducer(self.client) + resp = producer.send_messages(self.topic, "one", "two") # Will go to partition 0 self.assertEquals(len(resp), 1) @@ -293,7 +293,7 @@ class TestKafkaClient(KafkaTestCase): self.assertEquals(resp[0].offset, 0) # offset of first msg # Will go to partition 1 - resp = producer.send_messages("three") + resp = producer.send_messages(self.topic, "three") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg @@ -315,7 +315,7 @@ class TestKafkaClient(KafkaTestCase): self.assertEquals(messages[0].message.value, "three") # Will go to partition 0 - resp = producer.send_messages("four", "five") + resp = producer.send_messages(self.topic, "four", "five") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 2) # offset of first msg @@ -323,12 +323,12 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_round_robin_partitioner(self): - producer = KeyedProducer(self.client, self.topic, + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - producer.send("key1", "one") - producer.send("key2", "two") - producer.send("key3", "three") - producer.send("key4", "four") + producer.send(self.topic, "key1", "one") + producer.send(self.topic, "key2", "two") + producer.send(self.topic, "key3", "three") + producer.send(self.topic, "key4", "four") fetch1 = FetchRequest(self.topic, 0, 0, 1024) fetch2 = FetchRequest(self.topic, 1, 0, 1024) @@ -357,12 +357,12 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, self.topic, + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - producer.send(1, "one") - producer.send(2, "two") - producer.send(3, "three") - producer.send(4, "four") + producer.send(self.topic, 1, "one") + producer.send(self.topic, 2, "two") + producer.send(self.topic, 3, "three") + producer.send(self.topic, 4, "four") fetch1 = FetchRequest(self.topic, 0, 0, 1024) fetch2 = FetchRequest(self.topic, 1, 0, 1024) @@ -391,9 +391,9 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_acks_none(self): - producer = SimpleProducer(self.client, self.topic, + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) fetch = FetchRequest(self.topic, 0, 0, 1024) @@ -410,9 +410,9 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_acks_local_write(self): - producer = SimpleProducer(self.client, self.topic, + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) fetch = FetchRequest(self.topic, 0, 0, 1024) @@ -430,9 +430,9 @@ class TestKafkaClient(KafkaTestCase): def test_acks_cluster_commit(self): producer = SimpleProducer( - self.client, self.topic, + self.client, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) fetch = FetchRequest(self.topic, 0, 0, 1024) @@ -449,8 +449,8 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_async_simple_producer(self): - producer = SimpleProducer(self.client, self.topic, async=True) - resp = producer.send_messages("one") + producer = SimpleProducer(self.client, async=True) + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) # Give it some time @@ -470,9 +470,9 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_async_keyed_producer(self): - producer = KeyedProducer(self.client, self.topic, async=True) + producer = KeyedProducer(self.client, async=True) - resp = producer.send("key1", "one") + resp = producer.send(self.topic, "key1", "one") self.assertEquals(len(resp), 0) # Give it some time @@ -492,14 +492,14 @@ class TestKafkaClient(KafkaTestCase): producer.stop() def test_batched_simple_producer(self): - producer = SimpleProducer(self.client, self.topic, + producer = SimpleProducer(self.client, batch_send=True, batch_send_every_n=10, batch_send_every_t=20) # Send 5 messages and do a fetch msgs = ["message-%d" % i for i in range(0, 5)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Batch mode is async. No ack self.assertEquals(len(resp), 0) @@ -522,7 +522,7 @@ class TestKafkaClient(KafkaTestCase): # Send 5 more messages, wait for 2 seconds and do a fetch msgs = ["message-%d" % i for i in range(5, 10)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Give it some time time.sleep(2) @@ -542,9 +542,9 @@ class TestKafkaClient(KafkaTestCase): # Send 7 messages and wait for 20 seconds msgs = ["message-%d" % i for i in range(10, 15)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) msgs = ["message-%d" % i for i in range(15, 17)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) fetch1 = FetchRequest(self.topic, 0, 5, 1024) fetch2 = FetchRequest(self.topic, 1, 5, 1024) @@ -846,25 +846,25 @@ class TestFailover(KafkaTestCase): def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 - producer = SimpleProducer(self.client, topic) + producer = SimpleProducer(self.client) for i in range(1, 4): # XXX unfortunately, the conns dict needs to be warmed for this to work # XXX unfortunately, for warming to work, we need at least as many partitions as brokers - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) # kil leader for partition 0 broker = self._kill_leader(topic, partition) # expect failure, reload meta data with self.assertRaises(FailedPayloadsError): - producer.send_messages('part 1') - producer.send_messages('part 2') + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') time.sleep(1) # send to new leader - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) broker.open() time.sleep(3) @@ -877,22 +877,22 @@ class TestFailover(KafkaTestCase): def test_switch_leader_async(self): key, topic, partition = random_string(5), self.topic, 0 - producer = SimpleProducer(self.client, topic, async=True) + producer = SimpleProducer(self.client, async=True) for i in range(1, 4): - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) # kil leader for partition 0 broker = self._kill_leader(topic, partition) # expect failure, reload meta data - producer.send_messages('part 1') - producer.send_messages('part 2') + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') time.sleep(1) # send to new leader - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) broker.open() time.sleep(3) @@ -903,9 +903,9 @@ class TestFailover(KafkaTestCase): producer.stop() - def _send_random_messages(self, producer, n): + def _send_random_messages(self, producer, topic, n): for j in range(n): - resp = producer.send_messages(random_string(10)) + resp = producer.send_messages(topic, random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) time.sleep(1) # give it some time |