summaryrefslogtreecommitdiff
path: root/kafka/future.py
blob: 24173bb3251f1b0d3af9ffd20777fd1c6edbdb83 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from kafka.common import RetriableError, IllegalStateError


class Future(object):
    def __init__(self):
        self.is_done = False
        self.value = None
        self.exception = None
        self._callbacks = []
        self._errbacks = []

    def succeeded(self):
        return self.is_done and not self.exception

    def failed(self):
        return self.is_done and self.exception

    def retriable(self):
        return isinstance(self.exception, RetriableError)

    def success(self, value):
        if self.is_done:
            raise IllegalStateError('Invalid attempt to complete a request future which is already complete')
        self.value = value
        self.is_done = True
        for f in self._callbacks:
            f(value)
        return self

    def failure(self, e):
        if self.is_done:
            raise IllegalStateError('Invalid attempt to complete a request future which is already complete')
        self.exception = e
        self.is_done = True
        for f in self._errbacks:
            f(e)
        return self

    def add_callback(self, f):
        if self.is_done and not self.exception:
            f(self.value)
        else:
            self._callbacks.append(f)
        return self

    def add_errback(self, f):
        if self.is_done and self.exception:
            f(self.exception)
        else:
            self._errbacks.append(f)
        return self