diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1..467bd76 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -24,6 +24,7 @@ class SimpleConsumer(object): client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -38,7 +39,7 @@ class SimpleConsumer(object): these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client @@ -47,6 +48,9 @@ class SimpleConsumer(object): self.client._load_metadata_for_topics(topic) self.offsets = {} + if not partitions: + partitions = self.client.topic_partitions[topic] + # Variables for handling offset commits self.commit_lock = Lock() self.commit_timer = None @@ -73,14 +77,14 @@ class SimpleConsumer(object): # Uncomment for 0.8.1 # - #for partition in self.client.topic_partitions[topic]: + #for partition in partitions: # req = OffsetFetchRequest(topic, partition) # (offset,) = self.client.send_offset_fetch_request(group, [req], # callback=get_or_init_offset_callback, # fail_on_error=False) # self.offsets[partition] = offset - for partition in self.client.topic_partitions[topic]: + for partition in partitions: self.offsets[partition] = 0 def stop(self): @@ -126,13 +130,13 @@ class SimpleConsumer(object): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def pending(self, partitions=[]): + def pending(self, partitions=None): """ Gets the pending message count partitions: list of partitions to check for, default is to check all """ - if len(partitions) == 0: + if not partitions: partitions = self.offsets.keys() total = 0 @@ -159,7 +163,7 @@ class SimpleConsumer(object): # Once the commit is done, start the timer again self.commit_timer.start() - def commit(self, partitions=[]): + def commit(self, partitions=None): """ Commit offsets for this consumer @@ -173,7 +177,7 @@ class SimpleConsumer(object): with self.commit_lock: reqs = [] - if len(partitions) == 0: # commit all partitions + if not partitions: # commit all partitions partitions = self.offsets.keys() for partition in partitions: |