diff options
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 589eb11..69c3830 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -3,6 +3,7 @@ import logging from kafka.common import ProduceRequest from kafka.protocol import create_message +from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") @@ -23,3 +24,34 @@ class SimpleProducer(object): resp = self.client.send_produce_request([req])[0] assert resp.error == 0 + + +class KeyedProducer(object): + """ + A producer which distributes messages to partitions based on the key + + Args: + client - The kafka client instance + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + def __init__(self, client, topic, partitioner=None): + self.client = client + self.topic = topic + self.client._load_metadata_for_topics(topic) + + if not partitioner: + partitioner = HashedPartitioner + + self.partitioner = partitioner(self.client.topic_partitions[topic]) + + def send(self, key, msg): + partitions = self.client.topic_partitions[self.topic] + partition = self.partitioner.partition(key, partitions) + + req = ProduceRequest(self.topic, partition, + messages=[create_message(msg)]) + + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 |