summaryrefslogtreecommitdiff
path: root/kafka/coordinator/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/protocol.py')
-rw-r--r--kafka/coordinator/protocol.py33
1 files changed, 33 insertions, 0 deletions
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
new file mode 100644
index 0000000..9af7225
--- /dev/null
+++ b/kafka/coordinator/protocol.py
@@ -0,0 +1,33 @@
+from __future__ import absolute_import
+
+from kafka.common import TopicPartition
+from kafka.protocol.struct import Struct
+from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
+
+
+class ConsumerProtocolMemberMetadata(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('subscription', Array(String('utf-8'))),
+ ('user_data', Bytes))
+
+
+class ConsumerProtocolMemberAssignment(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('assignment', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32)))),
+ ('user_data', Bytes))
+
+ def partitions(self):
+ return [TopicPartition(topic, partition)
+ for topic, partitions in self.assignment # pylint: disable-msg=no-member
+ for partition in partitions]
+
+
+class ConsumerProtocol(object):
+ PROTOCOL_TYPE = 'consumer'
+ ASSIGNMENT_STRATEGIES = ('roundrobin',)
+ METADATA = ConsumerProtocolMemberMetadata
+ ASSIGNMENT = ConsumerProtocolMemberAssignment