diff options
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r-- | kafka/consumer/base.py | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 0bbf46c..64d96ea 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import atexit import logging import numbers from threading import Lock @@ -75,6 +76,11 @@ class Consumer(object): for partition in partitions: self.offsets[partition] = 0 + # Register a cleanup handler + def cleanup(obj): + obj.stop() + self._cleanup_func = cleanup + atexit.register(cleanup, self) def fetch_last_known_offsets(self, partitions=None): if self.group is None: @@ -157,14 +163,30 @@ class Consumer(object): if self.count_since_commit >= self.auto_commit_every_n: self.commit() - def __del__(self): - self.stop() - def stop(self): if self.commit_timer is not None: self.commit_timer.stop() self.commit() + if hasattr(self, '_cleanup_func'): + # Remove cleanup handler now that we've stopped + + # py3 supports unregistering + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup_func) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer + # exists is fine here + try: + atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + except ValueError: + pass + + del self._cleanup_func + def pending(self, partitions=None): """ Gets the pending message count |