summaryrefslogtreecommitdiff
path: root/kafka/coordinator/protocol.py
blob: 56a39015912a0ac7a85d8463a2da8a873a857030 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from __future__ import absolute_import

from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
from kafka.structs import TopicPartition


class ConsumerProtocolMemberMetadata(Struct):
    SCHEMA = Schema(
        ('version', Int16),
        ('subscription', Array(String('utf-8'))),
        ('user_data', Bytes))


class ConsumerProtocolMemberAssignment(Struct):
    SCHEMA = Schema(
        ('version', Int16),
        ('assignment', Array(
            ('topic', String('utf-8')),
            ('partitions', Array(Int32)))),
        ('user_data', Bytes))

    def partitions(self):
        return [TopicPartition(topic, partition)
                for topic, partitions in self.assignment # pylint: disable-msg=no-member
                for partition in partitions]


class ConsumerProtocol(object):
    PROTOCOL_TYPE = 'consumer'
    ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
    METADATA = ConsumerProtocolMemberMetadata
    ASSIGNMENT = ConsumerProtocolMemberAssignment