diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-31 10:29:55 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-31 15:26:01 -0700 |
commit | 47989db113ff1603b081867f3914e0c0828dfc9c (patch) | |
tree | 82c9196a65b161a54538a84fe4af887c81e13497 | |
parent | 9fd08119170b64c56ea024d12ef6b0e6482d778b (diff) | |
download | kafka-python-47989db113ff1603b081867f3914e0c0828dfc9c.tar.gz |
Register atexit handlers for consumer and producer thread/multiprocess cleanup (not __del__)
-rw-r--r-- | kafka/consumer/base.py | 28 | ||||
-rw-r--r-- | kafka/producer/base.py | 27 |
2 files changed, 51 insertions, 4 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 0bbf46c..64d96ea 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import atexit import logging import numbers from threading import Lock @@ -75,6 +76,11 @@ class Consumer(object): for partition in partitions: self.offsets[partition] = 0 + # Register a cleanup handler + def cleanup(obj): + obj.stop() + self._cleanup_func = cleanup + atexit.register(cleanup, self) def fetch_last_known_offsets(self, partitions=None): if self.group is None: @@ -157,14 +163,30 @@ class Consumer(object): if self.count_since_commit >= self.auto_commit_every_n: self.commit() - def __del__(self): - self.stop() - def stop(self): if self.commit_timer is not None: self.commit_timer.stop() self.commit() + if hasattr(self, '_cleanup_func'): + # Remove cleanup handler now that we've stopped + + # py3 supports unregistering + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup_func) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer + # exists is fine here + try: + atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + except ValueError: + pass + + del self._cleanup_func + def pending(self, partitions=None): """ Gets the pending message count diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6a5a94e..2344168 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import atexit import logging import time @@ -152,7 +153,11 @@ class Producer(object): self.thread.daemon = True self.thread.start() - + def cleanup(obj): + if obj.stopped: + obj.stop() + self._cleanup_func = cleanup + atexit.register(cleanup, self) def send_messages(self, topic, partition, *msg): """ @@ -213,6 +218,26 @@ class Producer(object): if self.thread.is_alive(): self.thread_stop_event.set() + + if hasattr(self, '_cleanup_func'): + # Remove cleanup handler now that we've stopped + + # py3 supports unregistering + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup_func) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer exists + # but that is fine here + try: + atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + except ValueError: + pass + + del self._cleanup_func + self.stopped = True def __del__(self): |