summaryrefslogtreecommitdiff
path: root/kafka/coordinator/protocol.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 00:25:12 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 00:25:12 -0800
commitb7104957f7294d3cb0e47d47ff1b6710acf5653e (patch)
tree3a6cd7f5fd7e7a782982169529c9a42cbbe6d476 /kafka/coordinator/protocol.py
parentcc22d1bab82fd234f2a47d347152a321aaa0b53e (diff)
downloadkafka-python-b7104957f7294d3cb0e47d47ff1b6710acf5653e.tar.gz
Move ConsumerProtocol definition to kafka.coordinator.protocol
Diffstat (limited to 'kafka/coordinator/protocol.py')
-rw-r--r--kafka/coordinator/protocol.py33
1 files changed, 33 insertions, 0 deletions
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
new file mode 100644
index 0000000..9af7225
--- /dev/null
+++ b/kafka/coordinator/protocol.py
@@ -0,0 +1,33 @@
+from __future__ import absolute_import
+
+from kafka.common import TopicPartition
+from kafka.protocol.struct import Struct
+from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
+
+
+class ConsumerProtocolMemberMetadata(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('subscription', Array(String('utf-8'))),
+ ('user_data', Bytes))
+
+
+class ConsumerProtocolMemberAssignment(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('assignment', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32)))),
+ ('user_data', Bytes))
+
+ def partitions(self):
+ return [TopicPartition(topic, partition)
+ for topic, partitions in self.assignment # pylint: disable-msg=no-member
+ for partition in partitions]
+
+
+class ConsumerProtocol(object):
+ PROTOCOL_TYPE = 'consumer'
+ ASSIGNMENT_STRATEGIES = ('roundrobin',)
+ METADATA = ConsumerProtocolMemberMetadata
+ ASSIGNMENT = ConsumerProtocolMemberAssignment