summaryrefslogtreecommitdiff
path: root/kafka/future.py
blob: 20c31cf97585d9242c33de49815291b02d1871a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from kafka.common import IllegalStateError


class Future(object):
    def __init__(self):
        self.is_done = False
        self.value = None
        self.exception = None
        self._callbacks = []
        self._errbacks = []

    def succeeded(self):
        return self.is_done and not self.exception

    def failed(self):
        return self.is_done and self.exception

    def retriable(self):
        try:
            return self.exception.retriable
        except AttributeError:
            return False

    def success(self, value):
        if self.is_done:
            raise IllegalStateError('Invalid attempt to complete a request future which is already complete')
        self.value = value
        self.is_done = True
        for f in self._callbacks:
            f(value)
        return self

    def failure(self, e):
        if self.is_done:
            raise IllegalStateError('Invalid attempt to complete a request future which is already complete')
        self.exception = e
        self.is_done = True
        for f in self._errbacks:
            f(e)
        return self

    def add_callback(self, f):
        if self.is_done and not self.exception:
            f(self.value)
        else:
            self._callbacks.append(f)
        return self

    def add_errback(self, f):
        if self.is_done and self.exception:
            f(self.exception)
        else:
            self._errbacks.append(f)
        return self