diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-14 10:07:58 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:54 -0800 |
commit | 62a71892a687d99bf0076ed3a3d9c614f16a112c (patch) | |
tree | df2f9ce9916446d37dba5a43d011381ec2282a03 /kafka/consumer | |
parent | 4c111d3d7c760d9e67be5251eef1df02a64f33c7 (diff) | |
download | kafka-python-62a71892a687d99bf0076ed3a3d9c614f16a112c.tar.gz |
Add set_topic_partitions method to configure topics/partitions to consume
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/new.py | 67 |
1 files changed, 59 insertions, 8 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index 80fdcec..54b1922 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -4,12 +4,15 @@ import logging import sys import time +import six + from kafka.client import KafkaClient from kafka.common import ( OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, - FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError + FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError, + TopicAndPartition ) logger = logging.getLogger(__name__) @@ -118,15 +121,9 @@ class KafkaConsumer(object): """ def __init__(self, *topics, **configs): - self.topics = topics self.configure(**configs) - # Get initial topic metadata - self.client.load_metadata_for_topics() - for topic in self.topics: - if topic not in self.client.topic_partitions: - raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) - logger.info("Configuring consumer to fetch topic '%s'", topic) + self.set_topic_partitions(*topics) # Setup offsets self._offsets = OffsetsStruct(fetch=defaultdict(dict), @@ -333,6 +330,60 @@ class KafkaConsumer(object): client_id=self._config['client_id'], timeout=(self._config['socket_timeout_ms'] / 1000.0)) + def set_topic_partitions(self, *topics): + """ + Set the topic/partitions to consume + + Accepts types: + str - topic name, will consume all available partitions + TopicAndPartition namedtuple - will consume topic/partition + tuple - will consume (topic, partition) + dict - will consume { topic: partition } + { topic: [partition list] } + { topic: (partition tuple,) } + + Ex: + kafka = KafkaConsumer() + + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + """ + self.topics = [] + self.client.load_metadata_for_topics() + + for arg in topics: + if isinstance(arg, six.string_types): + for partition in self.client.get_partition_ids_for_topic(arg): + self.topics.append(TopicAndPartition(arg, partition)) + + elif isinstance(arg, TopicAndPartition): + self.topics.append(arg) + + elif isinstance(arg, tuple): + self.topics.append(TopicAndPartition(*arg)) + + elif isinstance(arg, dict): + for topic in arg: + if isinstance(arg[topic], int): + self.topics.append(TopicAndPartition(topic, arg[topic])) + elif isinstance(arg[topic], (list, tuple)): + for partition in arg[topic]: + self.topics.append(TopicAndPartition(topic, partition)) + else: + raise KafkaConfigurationError('Unknown topic type (dict key must be ' + 'int or list/tuple of ints)') + else: + raise KafkaConfigurationError('Unknown topic type (topic must be ' + 'string, TopicAndPartition, ' + '(topic,partition) tuple, or {topic: ' + 'partitions} dict)') + + # Get initial topic metadata + for topic_partitions in self.topics: + if topic not in self.client.topic_partitions: + raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) + logger.info("Configuring consumer to fetch topic '%s'", topic) + def fetch_messages(self): max_bytes = self._config['fetch_message_max_bytes'] |