summaryrefslogtreecommitdiff
path: root/kafka/protocol/commit.py
blob: a32f8d3b9c3407bfe4a7fd956275805e9e6e0ab9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String


class OffsetCommitResponse(Struct):
    SCHEMA = Schema(
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(
                ('partition', Int32),
                ('error_code', Int16)))))
    )


class OffsetCommitRequest_v2(Struct):
    API_KEY = 8
    API_VERSION = 2 # added retention_time, dropped timestamp
    RESPONSE_TYPE = OffsetCommitResponse
    SCHEMA = Schema(
        ('consumer_group', String('utf-8')),
        ('consumer_group_generation_id', Int32),
        ('consumer_id', String('utf-8')),
        ('retention_time', Int64),
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(
                ('partition', Int32),
                ('offset', Int64),
                ('metadata', String('utf-8'))))))
    )
    DEFAULT_GENERATION_ID = -1
    DEFAULT_RETENTION_TIME = -1


class OffsetCommitRequest_v1(Struct):
    API_KEY = 8
    API_VERSION = 1 # Kafka-backed storage
    RESPONSE_TYPE = OffsetCommitResponse
    SCHEMA = Schema(
        ('consumer_group', String('utf-8')),
        ('consumer_group_generation_id', Int32),
        ('consumer_id', String('utf-8')),
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(
                ('partition', Int32),
                ('offset', Int64),
                ('timestamp', Int64),
                ('metadata', String('utf-8'))))))
    )


class OffsetCommitRequest_v0(Struct):
    API_KEY = 8
    API_VERSION = 0 # Zookeeper-backed storage
    RESPONSE_TYPE = OffsetCommitResponse
    SCHEMA = Schema(
        ('consumer_group', String('utf-8')),
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(
                ('partition', Int32),
                ('offset', Int64),
                ('metadata', String('utf-8'))))))
    )


class OffsetFetchResponse(Struct):
    SCHEMA = Schema(
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(
                ('partition', Int32),
                ('offset', Int64),
                ('metadata', String('utf-8')),
                ('error_code', Int16)))))
    )


class OffsetFetchRequest_v1(Struct):
    API_KEY = 9
    API_VERSION = 1 # kafka-backed storage
    RESPONSE_TYPE = OffsetFetchResponse
    SCHEMA = Schema(
        ('consumer_group', String('utf-8')),
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(Int32))))
    )


class OffsetFetchRequest_v0(Struct):
    API_KEY = 9
    API_VERSION = 0 # zookeeper-backed storage
    RESPONSE_TYPE = OffsetFetchResponse
    SCHEMA = Schema(
        ('consumer_group', String('utf-8')),
        ('topics', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(Int32))))
    )


class GroupCoordinatorResponse(Struct):
    SCHEMA = Schema(
        ('error_code', Int16),
        ('coordinator_id', Int32),
        ('host', String('utf-8')),
        ('port', Int32)
    )


class GroupCoordinatorRequest(Struct):
    API_KEY = 10
    API_VERSION = 0
    RESPONSE_TYPE = GroupCoordinatorResponse
    SCHEMA = Schema(
        ('consumer_group', String('utf-8'))
    )