diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-12 14:56:58 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-12 14:56:58 +0530 |
commit | ab72da3db32556b04d45954da8cf6c462dc53434 (patch) | |
tree | ee656a89aa436343536f40859af8b6b3c93bf091 /kafka/consumer.py | |
parent | 77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff) | |
download | kafka-python-ab72da3db32556b04d45954da8cf6c462dc53434.tar.gz |
Add support to consume messages from specific partitions
Currently the kafka SimpleConsumer consumes messages from all
partitions. This commit will ensure that data is consumed only
from partitions specified during init
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1..467bd76 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -24,6 +24,7 @@ class SimpleConsumer(object): client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -38,7 +39,7 @@ class SimpleConsumer(object): these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client @@ -47,6 +48,9 @@ class SimpleConsumer(object): self.client._load_metadata_for_topics(topic) self.offsets = {} + if not partitions: + partitions = self.client.topic_partitions[topic] + # Variables for handling offset commits self.commit_lock = Lock() self.commit_timer = None @@ -73,14 +77,14 @@ class SimpleConsumer(object): # Uncomment for 0.8.1 # - #for partition in self.client.topic_partitions[topic]: + #for partition in partitions: # req = OffsetFetchRequest(topic, partition) # (offset,) = self.client.send_offset_fetch_request(group, [req], # callback=get_or_init_offset_callback, # fail_on_error=False) # self.offsets[partition] = offset - for partition in self.client.topic_partitions[topic]: + for partition in partitions: self.offsets[partition] = 0 def stop(self): @@ -126,13 +130,13 @@ class SimpleConsumer(object): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def pending(self, partitions=[]): + def pending(self, partitions=None): """ Gets the pending message count partitions: list of partitions to check for, default is to check all """ - if len(partitions) == 0: + if not partitions: partitions = self.offsets.keys() total = 0 @@ -159,7 +163,7 @@ class SimpleConsumer(object): # Once the commit is done, start the timer again self.commit_timer.start() - def commit(self, partitions=[]): + def commit(self, partitions=None): """ Commit offsets for this consumer @@ -173,7 +177,7 @@ class SimpleConsumer(object): with self.commit_lock: reqs = [] - if len(partitions) == 0: # commit all partitions + if not partitions: # commit all partitions partitions = self.offsets.keys() for partition in partitions: |