summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-12 14:56:58 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-12 14:56:58 +0530
commitab72da3db32556b04d45954da8cf6c462dc53434 (patch)
treeee656a89aa436343536f40859af8b6b3c93bf091 /kafka/consumer.py
parent77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff)
downloadkafka-python-ab72da3db32556b04d45954da8cf6c462dc53434.tar.gz
Add support to consume messages from specific partitions
Currently the kafka SimpleConsumer consumes messages from all partitions. This commit will ensure that data is consumed only from partitions specified during init
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c9f12e1..467bd76 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -24,6 +24,7 @@ class SimpleConsumer(object):
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
topic: the topic to consume
+ partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
@@ -38,7 +39,7 @@ class SimpleConsumer(object):
these triggers
"""
- def __init__(self, client, group, topic, auto_commit=True,
+ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
@@ -47,6 +48,9 @@ class SimpleConsumer(object):
self.client._load_metadata_for_topics(topic)
self.offsets = {}
+ if not partitions:
+ partitions = self.client.topic_partitions[topic]
+
# Variables for handling offset commits
self.commit_lock = Lock()
self.commit_timer = None
@@ -73,14 +77,14 @@ class SimpleConsumer(object):
# Uncomment for 0.8.1
#
- #for partition in self.client.topic_partitions[topic]:
+ #for partition in partitions:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback,
# fail_on_error=False)
# self.offsets[partition] = offset
- for partition in self.client.topic_partitions[topic]:
+ for partition in partitions:
self.offsets[partition] = 0
def stop(self):
@@ -126,13 +130,13 @@ class SimpleConsumer(object):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
- def pending(self, partitions=[]):
+ def pending(self, partitions=None):
"""
Gets the pending message count
partitions: list of partitions to check for, default is to check all
"""
- if len(partitions) == 0:
+ if not partitions:
partitions = self.offsets.keys()
total = 0
@@ -159,7 +163,7 @@ class SimpleConsumer(object):
# Once the commit is done, start the timer again
self.commit_timer.start()
- def commit(self, partitions=[]):
+ def commit(self, partitions=None):
"""
Commit offsets for this consumer
@@ -173,7 +177,7 @@ class SimpleConsumer(object):
with self.commit_lock:
reqs = []
- if len(partitions) == 0: # commit all partitions
+ if not partitions: # commit all partitions
partitions = self.offsets.keys()
for partition in partitions: