summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-14 10:07:58 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:54 -0800
commit62a71892a687d99bf0076ed3a3d9c614f16a112c (patch)
treedf2f9ce9916446d37dba5a43d011381ec2282a03 /kafka/consumer
parent4c111d3d7c760d9e67be5251eef1df02a64f33c7 (diff)
downloadkafka-python-62a71892a687d99bf0076ed3a3d9c614f16a112c.tar.gz
Add set_topic_partitions method to configure topics/partitions to consume
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/new.py67
1 files changed, 59 insertions, 8 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 80fdcec..54b1922 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -4,12 +4,15 @@ import logging
import sys
import time
+import six
+
from kafka.client import KafkaClient
from kafka.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
- FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
+ FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError,
+ TopicAndPartition
)
logger = logging.getLogger(__name__)
@@ -118,15 +121,9 @@ class KafkaConsumer(object):
"""
def __init__(self, *topics, **configs):
- self.topics = topics
self.configure(**configs)
- # Get initial topic metadata
- self.client.load_metadata_for_topics()
- for topic in self.topics:
- if topic not in self.client.topic_partitions:
- raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
- logger.info("Configuring consumer to fetch topic '%s'", topic)
+ self.set_topic_partitions(*topics)
# Setup offsets
self._offsets = OffsetsStruct(fetch=defaultdict(dict),
@@ -333,6 +330,60 @@ class KafkaConsumer(object):
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0))
+ def set_topic_partitions(self, *topics):
+ """
+ Set the topic/partitions to consume
+
+ Accepts types:
+ str - topic name, will consume all available partitions
+ TopicAndPartition namedtuple - will consume topic/partition
+ tuple - will consume (topic, partition)
+ dict - will consume { topic: partition }
+ { topic: [partition list] }
+ { topic: (partition tuple,) }
+
+ Ex:
+ kafka = KafkaConsumer()
+
+ # Consume topic1-all; topic2-partition2; topic3-partition0
+ kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+ """
+ self.topics = []
+ self.client.load_metadata_for_topics()
+
+ for arg in topics:
+ if isinstance(arg, six.string_types):
+ for partition in self.client.get_partition_ids_for_topic(arg):
+ self.topics.append(TopicAndPartition(arg, partition))
+
+ elif isinstance(arg, TopicAndPartition):
+ self.topics.append(arg)
+
+ elif isinstance(arg, tuple):
+ self.topics.append(TopicAndPartition(*arg))
+
+ elif isinstance(arg, dict):
+ for topic in arg:
+ if isinstance(arg[topic], int):
+ self.topics.append(TopicAndPartition(topic, arg[topic]))
+ elif isinstance(arg[topic], (list, tuple)):
+ for partition in arg[topic]:
+ self.topics.append(TopicAndPartition(topic, partition))
+ else:
+ raise KafkaConfigurationError('Unknown topic type (dict key must be '
+ 'int or list/tuple of ints)')
+ else:
+ raise KafkaConfigurationError('Unknown topic type (topic must be '
+ 'string, TopicAndPartition, '
+ '(topic,partition) tuple, or {topic: '
+ 'partitions} dict)')
+
+ # Get initial topic metadata
+ for topic_partitions in self.topics:
+ if topic not in self.client.topic_partitions:
+ raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
+ logger.info("Configuring consumer to fetch topic '%s'", topic)
+
def fetch_messages(self):
max_bytes = self._config['fetch_message_max_bytes']