summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/base.py28
-rw-r--r--kafka/producer/base.py27
2 files changed, 51 insertions, 4 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 0bbf46c..64d96ea 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import atexit
import logging
import numbers
from threading import Lock
@@ -75,6 +76,11 @@ class Consumer(object):
for partition in partitions:
self.offsets[partition] = 0
+ # Register a cleanup handler
+ def cleanup(obj):
+ obj.stop()
+ self._cleanup_func = cleanup
+ atexit.register(cleanup, self)
def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
@@ -157,14 +163,30 @@ class Consumer(object):
if self.count_since_commit >= self.auto_commit_every_n:
self.commit()
- def __del__(self):
- self.stop()
-
def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()
+ if hasattr(self, '_cleanup_func'):
+ # Remove cleanup handler now that we've stopped
+
+ # py3 supports unregistering
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(self._cleanup_func) # pylint: disable=no-member
+
+ # py2 requires removing from private attribute...
+ else:
+
+ # ValueError on list.remove() if the exithandler no longer
+ # exists is fine here
+ try:
+ atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
+ except ValueError:
+ pass
+
+ del self._cleanup_func
+
def pending(self, partitions=None):
"""
Gets the pending message count
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 6a5a94e..2344168 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import atexit
import logging
import time
@@ -152,7 +153,11 @@ class Producer(object):
self.thread.daemon = True
self.thread.start()
-
+ def cleanup(obj):
+ if obj.stopped:
+ obj.stop()
+ self._cleanup_func = cleanup
+ atexit.register(cleanup, self)
def send_messages(self, topic, partition, *msg):
"""
@@ -213,6 +218,26 @@ class Producer(object):
if self.thread.is_alive():
self.thread_stop_event.set()
+
+ if hasattr(self, '_cleanup_func'):
+ # Remove cleanup handler now that we've stopped
+
+ # py3 supports unregistering
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(self._cleanup_func) # pylint: disable=no-member
+
+ # py2 requires removing from private attribute...
+ else:
+
+ # ValueError on list.remove() if the exithandler no longer exists
+ # but that is fine here
+ try:
+ atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
+ except ValueError:
+ pass
+
+ del self._cleanup_func
+
self.stopped = True
def __del__(self):