diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-17 17:29:54 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-17 23:22:35 -0800 |
commit | f1ad0247df5bf6e0315ffbb1633d5979da828de0 (patch) | |
tree | ca96d1d960a13ae481b76fd32761ea535234f02b /kafka/future.py | |
parent | 799824535ceeb698152a3078f64ecbf6baca9b39 (diff) | |
download | kafka-python-f1ad0247df5bf6e0315ffbb1633d5979da828de0.tar.gz |
Switch BrokerConnection to (mostly) non-blocking IO.
- return kafka.Future on send()
- recv is now non-blocking call that completes futures when possible
- update KafkaClient to block on future completion
Diffstat (limited to 'kafka/future.py')
-rw-r--r-- | kafka/future.py | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/kafka/future.py b/kafka/future.py new file mode 100644 index 0000000..24173bb --- /dev/null +++ b/kafka/future.py @@ -0,0 +1,51 @@ +from kafka.common import RetriableError, IllegalStateError + + +class Future(object): + def __init__(self): + self.is_done = False + self.value = None + self.exception = None + self._callbacks = [] + self._errbacks = [] + + def succeeded(self): + return self.is_done and not self.exception + + def failed(self): + return self.is_done and self.exception + + def retriable(self): + return isinstance(self.exception, RetriableError) + + def success(self, value): + if self.is_done: + raise IllegalStateError('Invalid attempt to complete a request future which is already complete') + self.value = value + self.is_done = True + for f in self._callbacks: + f(value) + return self + + def failure(self, e): + if self.is_done: + raise IllegalStateError('Invalid attempt to complete a request future which is already complete') + self.exception = e + self.is_done = True + for f in self._errbacks: + f(e) + return self + + def add_callback(self, f): + if self.is_done and not self.exception: + f(self.value) + else: + self._callbacks.append(f) + return self + + def add_errback(self, f): + if self.is_done and self.exception: + f(self.exception) + else: + self._errbacks.append(f) + return self |