summaryrefslogtreecommitdiff
path: root/kafka/util.py
diff options
context:
space:
mode:
authoraisch <me+bosch@aitmp.com>2016-02-16 21:30:38 -0800
committeraisch <me+bosch@aitmp.com>2016-02-16 21:30:38 -0800
commitd7522b0fb79bffbe10a2548658a48829dd1a5c33 (patch)
tree53b9e5f662a0f583f82513c6b4f6e549f7088478 /kafka/util.py
parent9f0db5d38b444f5a93da7bed4a19114aff8701e8 (diff)
downloadkafka-python-d7522b0fb79bffbe10a2548658a48829dd1a5c33.tar.gz
break up some circular references and close client wake pipe on __del__
Diffstat (limited to 'kafka/util.py')
-rw-r--r--kafka/util.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/kafka/util.py b/kafka/util.py
index c6e77fa..7a11910 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -3,6 +3,7 @@ import collections
import struct
import sys
from threading import Thread, Event
+import weakref
import six
@@ -151,3 +152,39 @@ class ReentrantTimer(object):
def __del__(self):
self.stop()
+
+
+class WeakMethod(object):
+ """
+ Callable that weakly references a method and the object it is bound to. It
+ is based on http://stackoverflow.com/a/24287465.
+
+ Arguments:
+
+ object_dot_method: A bound instance method (i.e. 'object.method').
+ """
+ def __init__(self, object_dot_method):
+ try:
+ self.target = weakref.ref(object_dot_method.__self__)
+ except AttributeError:
+ self.target = weakref.ref(object_dot_method.im_self)
+ self._target_id = id(self.target())
+ try:
+ self.method = weakref.ref(object_dot_method.__func__)
+ except AttributeError:
+ self.method = weakref.ref(object_dot_method.im_func)
+ self._method_id = id(self.method())
+
+ def __call__(self, *args, **kwargs):
+ """
+ Calls the method on target with args and kwargs.
+ """
+ return self.method()(self.target(), *args, **kwargs)
+
+ def __hash__(self):
+ return hash(self.target) ^ hash(self.method)
+
+ def __eq__(self, other):
+ if not isinstance(other, WeakMethod):
+ return False
+ return self._target_id == other._target_id and self._method_id == other._method_id