summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-28 14:54:11 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-28 14:54:11 +0530
commitc54a2edbaec1c4442cc63c8d3f0874b5882e90bb (patch)
tree85dfb8da9fbf79d474688f921a66d7f8654199b4 /kafka/consumer.py
parentc13ee1df6ab2900ffe0bd48e6376993c0d312a70 (diff)
downloadkafka-python-c54a2edbaec1c4442cc63c8d3f0874b5882e90bb.tar.gz
Add more cleanup in consumer.stop()
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 23a2f90..6ceea72 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -159,6 +159,11 @@ class Consumer(object):
if self.count_since_commit > self.auto_commit_every_n:
self.commit()
+ def stop(self):
+ if self.commit_timer is not None:
+ self.commit_timer.stop()
+ self.commit()
+
def pending(self, partitions=None):
"""
Gets the pending message count
@@ -226,11 +231,6 @@ class SimpleConsumer(Consumer):
"""
self.partition_info = True
- def stop(self):
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
-
def seek(self, offset, whence):
"""
Alter the current offset in the consumer, similar to fseek
@@ -510,6 +510,8 @@ class MultiProcessConsumer(Consumer):
proc.join()
proc.terminate()
+ super(MultiProcessConsumer, self).stop()
+
def __iter__(self):
"""
Iterator to consume the messages available on this consumer