summaryrefslogtreecommitdiff
path: root/kafka/future.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-17 17:29:54 -0800
committerDana Powers <dana.powers@rd.io>2015-12-17 23:22:35 -0800
commitf1ad0247df5bf6e0315ffbb1633d5979da828de0 (patch)
treeca96d1d960a13ae481b76fd32761ea535234f02b /kafka/future.py
parent799824535ceeb698152a3078f64ecbf6baca9b39 (diff)
downloadkafka-python-f1ad0247df5bf6e0315ffbb1633d5979da828de0.tar.gz
Switch BrokerConnection to (mostly) non-blocking IO.
- return kafka.Future on send() - recv is now non-blocking call that completes futures when possible - update KafkaClient to block on future completion
Diffstat (limited to 'kafka/future.py')
-rw-r--r--kafka/future.py51
1 files changed, 51 insertions, 0 deletions
diff --git a/kafka/future.py b/kafka/future.py
new file mode 100644
index 0000000..24173bb
--- /dev/null
+++ b/kafka/future.py
@@ -0,0 +1,51 @@
+from kafka.common import RetriableError, IllegalStateError
+
+
+class Future(object):
+ def __init__(self):
+ self.is_done = False
+ self.value = None
+ self.exception = None
+ self._callbacks = []
+ self._errbacks = []
+
+ def succeeded(self):
+ return self.is_done and not self.exception
+
+ def failed(self):
+ return self.is_done and self.exception
+
+ def retriable(self):
+ return isinstance(self.exception, RetriableError)
+
+ def success(self, value):
+ if self.is_done:
+ raise IllegalStateError('Invalid attempt to complete a request future which is already complete')
+ self.value = value
+ self.is_done = True
+ for f in self._callbacks:
+ f(value)
+ return self
+
+ def failure(self, e):
+ if self.is_done:
+ raise IllegalStateError('Invalid attempt to complete a request future which is already complete')
+ self.exception = e
+ self.is_done = True
+ for f in self._errbacks:
+ f(e)
+ return self
+
+ def add_callback(self, f):
+ if self.is_done and not self.exception:
+ f(self.value)
+ else:
+ self._callbacks.append(f)
+ return self
+
+ def add_errback(self, f):
+ if self.is_done and self.exception:
+ f(self.exception)
+ else:
+ self._errbacks.append(f)
+ return self