summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r--kafka/consumer/base.py28
1 files changed, 25 insertions, 3 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 2bd42eb..aef898a 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import atexit
import logging
import numbers
from threading import Lock
@@ -75,6 +76,11 @@ class Consumer(object):
for partition in partitions:
self.offsets[partition] = 0
+ # Register a cleanup handler
+ def cleanup(obj):
+ obj.stop()
+ self._cleanup_func = cleanup
+ atexit.register(cleanup, self)
def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
@@ -157,14 +163,30 @@ class Consumer(object):
if self.count_since_commit >= self.auto_commit_every_n:
self.commit()
- def __del__(self):
- self.stop()
-
def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()
+ if hasattr(self, '_cleanup_func'):
+ # Remove cleanup handler now that we've stopped
+
+ # py3 supports unregistering
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(self._cleanup_func) # pylint: disable=no-member
+
+ # py2 requires removing from private attribute...
+ else:
+
+ # ValueError on list.remove() if the exithandler no longer
+ # exists is fine here
+ try:
+ atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
+ except ValueError:
+ pass
+
+ del self._cleanup_func
+
def pending(self, partitions=None):
"""
Gets the pending message count