diff options
author | David Arthur <mumrah@gmail.com> | 2013-06-13 06:03:36 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-06-13 06:03:36 -0700 |
commit | 883eed1f8ce1af37c621ad6ec89dc993694fd29b (patch) | |
tree | 1af748932ca319bbe040d3aa1f73604701d5f2ed | |
parent | 77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff) | |
parent | 0723cf8beca7ff6433c79ca04ad1945b00eea6d1 (diff) | |
download | kafka-python-883eed1f8ce1af37c621ad6ec89dc993694fd29b.tar.gz |
Merge pull request #32 from mahendra/keyed
Implement support for keyed messages
-rw-r--r-- | README.md | 16 | ||||
-rw-r--r-- | kafka/client.py | 5 | ||||
-rw-r--r-- | kafka/partitioner.py | 56 | ||||
-rw-r--r-- | kafka/producer.py | 32 |
4 files changed, 109 insertions, 0 deletions
@@ -41,6 +41,22 @@ for message in consumer: kafka.close() ``` +## Keyed messages +```python +from kafka.client import KafkaClient +from kafka.producer import KeyedProducer +from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner + +kafka = KafkaClient("localhost", 9092) + +# HashedPartitioner is default +producer = KeyedProducer(kafka, "my-topic") +producer.send("key1", "some message") +producer.send("key2", "this methode") + +producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) +``` + ## Low level ```python diff --git a/kafka/client.py b/kafka/client.py index 5595d49..1146798 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -73,7 +73,12 @@ class KafkaClient(object): self.brokers.update(brokers) self.topics_to_brokers = {} + for topic, partitions in topics.items(): + # Clear the list once before we add it. This removes stale entries + # and avoids duplicates + self.topic_partitions.pop(topic, None) + if not partitions: log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) diff --git a/kafka/partitioner.py b/kafka/partitioner.py new file mode 100644 index 0000000..84db4d5 --- /dev/null +++ b/kafka/partitioner.py @@ -0,0 +1,56 @@ +from itertools import cycle + + +class Partitioner(object): + """ + Base class for a partitioner + """ + def __init__(self, partitions): + """ + Initialize the partitioner + + partitions - A list of available partitions (during startup) + """ + self.partitions = partitions + + def partition(self, key, partitions): + """ + Takes a string key and num_partitions as argument and returns + a partition to be used for the message + + partitions - The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing + """ + raise NotImplemented('partition function has to be implemented') + + +class RoundRobinPartitioner(Partitioner): + """ + Implements a round robin partitioner which sends data to partitions + in a round robin fashion + """ + def __init__(self, partitions): + self._set_partitions(partitions) + + def _set_partitions(self, partitions): + self.partitions = partitions + self.iterpart = cycle(partitions) + + def partition(self, key, partitions): + # Refresh the partition list if necessary + if self.partitions != partitions: + self._set_partitions(partitions) + + return self.iterpart.next() + + +class HashedPartitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def partition(self, key, partitions): + size = len(partitions) + idx = hash(key) % size + return partitions[idx] diff --git a/kafka/producer.py b/kafka/producer.py index 589eb11..69c3830 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -3,6 +3,7 @@ import logging from kafka.common import ProduceRequest from kafka.protocol import create_message +from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") @@ -23,3 +24,34 @@ class SimpleProducer(object): resp = self.client.send_produce_request([req])[0] assert resp.error == 0 + + +class KeyedProducer(object): + """ + A producer which distributes messages to partitions based on the key + + Args: + client - The kafka client instance + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + def __init__(self, client, topic, partitioner=None): + self.client = client + self.topic = topic + self.client._load_metadata_for_topics(topic) + + if not partitioner: + partitioner = HashedPartitioner + + self.partitioner = partitioner(self.client.topic_partitions[topic]) + + def send(self, key, msg): + partitions = self.client.topic_partitions[self.topic] + partition = self.partitioner.partition(key, partitions) + + req = ProduceRequest(self.topic, partition, + messages=[create_message(msg)]) + + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 |