diff options
-rw-r--r-- | README.md | 12 | ||||
-rw-r--r-- | kafka/partitioner.py | 52 | ||||
-rw-r--r-- | kafka/producer.py | 32 |
3 files changed, 96 insertions, 0 deletions
@@ -41,6 +41,18 @@ for message in consumer: kafka.close() ``` +## Keyed messages +from kafka.client import KafkaClient +from kafka.producer import KeyedProducer +from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner + +kafka = KafkaClient("localhost", 9092) + +producer = KeyedProducer(kafka, "my-topic", partitioner=HashedPartitioner) +producer.send("key1", "some message") +producer.send("key2", "this methode") + + ## Low level ```python diff --git a/kafka/partitioner.py b/kafka/partitioner.py new file mode 100644 index 0000000..0f49b07 --- /dev/null +++ b/kafka/partitioner.py @@ -0,0 +1,52 @@ +from itertools import cycle + + +class Partitioner(object): + """ + Base class for a partitioner + """ + def __init__(self, partitions): + """ + Initialize the partitioner + + partitions - A list of available partitions (during startup) + """ + self.partitions = partitions + + def partition(self, key, partitions): + """ + Takes a string key and num_partitions as argument and returns + a partition to be used for the message + + partitions - The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing + """ + raise NotImplemented('partition function has to be implemented') + + +class RoundRobinPartitioner(Partitioner): + """ + Implements a round robin partitioner which sends data to partitions + in a round robin fashion + """ + def __init__(self, partitions): + self.partitions = cycle(partitions) + + def partition(self, key, partitions): + # Refresh the partition list if necessary + if self.partitions != partitions: + self.partitions = cycle(partitions) + + return self.partitions.next() + + +class HashedPartitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def partition(self, key, partitions): + size = len(partitions) + idx = hash(key) % size + return partitions[idx] diff --git a/kafka/producer.py b/kafka/producer.py index 589eb11..75f90c6 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -3,6 +3,7 @@ import logging from kafka.common import ProduceRequest from kafka.protocol import create_message +from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") @@ -23,3 +24,34 @@ class SimpleProducer(object): resp = self.client.send_produce_request([req])[0] assert resp.error == 0 + + +class KeyedProducer(object): + """ + A producer which distributes messages to partitions based on the key + + Args: + client - The kafka client instance + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + def __init__(self, client, topic, partitioner=None): + self.client = client + self.topic = topic + self.client._load_metadata_for_topics(topic) + + if not partitioner: + partitioner = HashedPartitioner + + self.partitioner = partitioner(self.client.topic_partitions[topic]) + + def send(self, client, key, msg): + partitions = self.client.topic_partitions[topic] + partition = self.partitioner.partition(key, partitions) + + req = ProduceRequest(self.topic, partition, + messages=[create_message(msg)]) + + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 |