1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
from collections import namedtuple
###############
# Structs #
###############
# Request payloads
ProduceRequest = namedtuple("ProduceRequest",
["topic", "partition", "messages"])
FetchRequest = namedtuple("FetchRequest",
["topic", "partition", "offset", "max_bytes"])
OffsetRequest = namedtuple("OffsetRequest",
["topic", "partition", "time", "max_offsets"])
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error",
"highwaterMark", "messages"])
OffsetResponse = namedtuple("OffsetResponse",
["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse",
["topic", "partition", "error"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset",
"metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader",
"replicas", "isr"])
# Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
ErrorStrings = {
-1 : 'UNKNOWN',
0 : 'NO_ERROR',
1 : 'OFFSET_OUT_OF_RANGE',
2 : 'INVALID_MESSAGE',
3 : 'UNKNOWN_TOPIC_OR_PARTITON',
4 : 'INVALID_FETCH_SIZE',
5 : 'LEADER_NOT_AVAILABLE',
6 : 'NOT_LEADER_FOR_PARTITION',
7 : 'REQUEST_TIMED_OUT',
8 : 'BROKER_NOT_AVAILABLE',
9 : 'REPLICA_NOT_AVAILABLE',
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
}
class ErrorMapping(object):
pass
for k, v in ErrorStrings.items():
setattr(ErrorMapping, v, k)
#################
# Exceptions #
#################
class KafkaError(RuntimeError):
pass
class KafkaUnavailableError(KafkaError):
pass
class BrokerResponseError(KafkaError):
pass
class LeaderUnavailableError(KafkaError):
pass
class PartitionUnavailableError(KafkaError):
pass
class FailedPayloadsError(KafkaError):
pass
class ConnectionError(KafkaError):
pass
class BufferUnderflowError(KafkaError):
pass
class ChecksumError(KafkaError):
pass
class ConsumerFetchSizeTooSmall(KafkaError):
pass
class ConsumerNoMoreData(KafkaError):
pass
|