diff options
-rw-r--r-- | kafka/consumer.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 23a2f90..6ceea72 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -159,6 +159,11 @@ class Consumer(object): if self.count_since_commit > self.auto_commit_every_n: self.commit() + def stop(self): + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + def pending(self, partitions=None): """ Gets the pending message count @@ -226,11 +231,6 @@ class SimpleConsumer(Consumer): """ self.partition_info = True - def stop(self): - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - def seek(self, offset, whence): """ Alter the current offset in the consumer, similar to fseek @@ -510,6 +510,8 @@ class MultiProcessConsumer(Consumer): proc.join() proc.terminate() + super(MultiProcessConsumer, self).stop() + def __iter__(self): """ Iterator to consume the messages available on this consumer |