diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-01-30 17:00:55 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-01-30 17:00:55 -0800 |
commit | 4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (patch) | |
tree | a670e3399b44ff66ea730a4a53513af178cd56e5 /kafka | |
parent | c9d9d0aad2447bb8bad0e62c97365e5101001e4b (diff) | |
parent | f6df696e0ab11ec931283dcca8c518cd54d57687 (diff) | |
download | kafka-python-4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb.tar.gz |
Merge pull request #111 from rdiomar/multitopic_producers
Make producers take a topic argument at send rather than init time -- fixes Issue #110, but breaks backwards compatibility with previous Producer interface.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer.py | 79 |
1 files changed, 44 insertions, 35 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 6b624f2..12a2934 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -8,7 +8,7 @@ from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from kafka.common import ProduceRequest +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner from kafka.protocol import create_message @@ -20,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20 STOP_ASYNC_PRODUCER = -1 -def _send_upstream(topic, queue, client, batch_time, batch_size, +def _send_upstream(queue, client, batch_time, batch_size, req_acks, ack_timeout): """ Listen on the queue for a specified number of messages or till @@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - partition, msg = queue.get(timeout=timeout) + topic_partition, msg = queue.get(timeout=timeout) + except Empty: break # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: + if topic_partition == STOP_ASYNC_PRODUCER: stop = True break # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[partition].append(msg) + msgset[topic_partition].append(msg) # Send collected requests upstream reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(topic, partition, messages) + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) reqs.append(req) try: @@ -78,7 +81,6 @@ class Producer(object): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -119,8 +121,7 @@ class Producer(object): if self.async: self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, - args=(self.topic, - self.queue, + args=(self.queue, self.client.copy(), batch_send_every_t, batch_send_every_n, @@ -131,17 +132,18 @@ class Producer(object): self.proc.daemon = True self.proc.start() - def send_messages(self, partition, *msg): + def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests """ if self.async: for m in msg: - self.queue.put((partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] - req = ProduceRequest(self.topic, partition, messages) + req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) @@ -169,7 +171,6 @@ class SimpleProducer(Producer): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -180,27 +181,31 @@ class SimpleProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, async=False, + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client.load_metadata_for_topics(topic) - self.next_partition = cycle(client.topic_partitions[topic]) - + self.partition_cycles = {} super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send_messages(self, *msg): - partition = self.next_partition.next() - return super(SimpleProducer, self).send_messages(partition, *msg) + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + return self.partition_cycles[topic].next() + + def send_messages(self, topic, *msg): + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages(topic, partition, *msg) def __repr__(self): - return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async) + return '<SimpleProducer batch=%s>' % self.async class KeyedProducer(Producer): @@ -209,7 +214,6 @@ class KeyedProducer(Producer): Args: client - The kafka client instance - topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner async - If True, the messages are sent asynchronously via another @@ -220,29 +224,34 @@ class KeyedProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None, async=False, + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client.load_metadata_for_topics(topic) - if not partitioner: partitioner = HashedPartitioner - - self.partitioner = partitioner(client.topic_partitions[topic]) + self.partitioner_class = partitioner + self.partitioners = {} super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send(self, key, msg): - partitions = self.client.topic_partitions[self.topic] - partition = self.partitioner.partition(key, partitions) - return self.send_messages(partition, msg) + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partitioners[topic] = \ + self.partitioner_class(self.client.topic_partitions[topic]) + partitioner = self.partitioners[topic] + return partitioner.partition(key, self.client.topic_partitions[topic]) + + def send(self, topic, key, msg): + partition = self._next_partition(topic, key) + return self.send_messages(topic, partition, msg) def __repr__(self): - return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async) + return '<KeyedProducer batch=%s>' % self.async |