diff options
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 40 |
1 files changed, 37 insertions, 3 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0793c80..2185869 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -5,12 +5,13 @@ import copy import logging import threading import time +import weakref +from .. import errors as Errors from ..client_async import KafkaClient -from ..structs import TopicPartition from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet -from .. import errors as Errors +from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator from .sender import Sender @@ -293,14 +294,47 @@ class KafkaProducer(object): self._sender.daemon = True self._sender.start() self._closed = False - atexit.register(self.close, timeout=0) + + self._cleanup = self._cleanup_factory() + atexit.register(self._cleanup) log.debug("Kafka producer started") + def _cleanup_factory(self): + """Build a cleanup clojure that doesn't increase our ref count""" + _self = weakref.proxy(self) + def wrapper(): + try: + _self.close() + except (ReferenceError, AttributeError): + pass + return wrapper + + def _unregister_cleanup(self): + if getattr(self, '_cleanup'): + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer exists + # but that is fine here + try: + atexit._exithandlers.remove( # pylint: disable=no-member + (self._cleanup, (), {})) + except ValueError: + pass + self._cleanup = None + def __del__(self): self.close(timeout=0) def close(self, timeout=None): """Close this producer.""" + + # drop our atexit handler now to avoid leaks + self._unregister_cleanup() + if not hasattr(self, '_closed') or self._closed: log.info('Kafka producer closed') return |