summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c9f12e1..467bd76 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -24,6 +24,7 @@ class SimpleConsumer(object):
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
topic: the topic to consume
+ partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
@@ -38,7 +39,7 @@ class SimpleConsumer(object):
these triggers
"""
- def __init__(self, client, group, topic, auto_commit=True,
+ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
@@ -47,6 +48,9 @@ class SimpleConsumer(object):
self.client._load_metadata_for_topics(topic)
self.offsets = {}
+ if not partitions:
+ partitions = self.client.topic_partitions[topic]
+
# Variables for handling offset commits
self.commit_lock = Lock()
self.commit_timer = None
@@ -73,14 +77,14 @@ class SimpleConsumer(object):
# Uncomment for 0.8.1
#
- #for partition in self.client.topic_partitions[topic]:
+ #for partition in partitions:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback,
# fail_on_error=False)
# self.offsets[partition] = offset
- for partition in self.client.topic_partitions[topic]:
+ for partition in partitions:
self.offsets[partition] = 0
def stop(self):
@@ -126,13 +130,13 @@ class SimpleConsumer(object):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
- def pending(self, partitions=[]):
+ def pending(self, partitions=None):
"""
Gets the pending message count
partitions: list of partitions to check for, default is to check all
"""
- if len(partitions) == 0:
+ if not partitions:
partitions = self.offsets.keys()
total = 0
@@ -159,7 +163,7 @@ class SimpleConsumer(object):
# Once the commit is done, start the timer again
self.commit_timer.start()
- def commit(self, partitions=[]):
+ def commit(self, partitions=None):
"""
Commit offsets for this consumer
@@ -173,7 +177,7 @@ class SimpleConsumer(object):
with self.commit_lock:
reqs = []
- if len(partitions) == 0: # commit all partitions
+ if not partitions: # commit all partitions
partitions = self.offsets.keys()
for partition in partitions: