summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authortpalino <tpalino@linkedin.com>2015-07-01 22:29:56 -0700
committertpalino <tpalino@linkedin.com>2015-07-01 22:29:56 -0700
commitc01963682e1ec16dbde9b55cda4ec95ad2346476 (patch)
treeb4de368ad49a6d0e23be9ff7f3af64d1f3c4f890 /kafka/protocol.py
parentadbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (diff)
downloadkafka-python-c01963682e1ec16dbde9b55cda4ec95ad2346476.tar.gz
Support consumer metadata requests
Support added for ConsumerMetadataRequest and ConsumerMetadataResponse Added consumer-aware request routine for supporting the consumer coordinator Added separate client method for fetching Kafka-committed offsets from the coordinator
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r--kafka/protocol.py53
1 files changed, 47 insertions, 6 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py
index d5adf89..1f3ea2f 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -14,7 +14,8 @@ from kafka.common import (
MetadataResponse, ProduceResponse, FetchResponse,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, BufferUnderflowError, ChecksumError,
- ConsumerFetchSizeTooSmall, UnsupportedCodecError
+ ConsumerFetchSizeTooSmall, UnsupportedCodecError,
+ ConsumerMetadataResponse
)
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
@@ -43,19 +44,21 @@ class KafkaProtocol(object):
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9
+ CONSUMER_METADATA_KEY = 10
###################
# Private API #
###################
@classmethod
- def _encode_message_header(cls, client_id, correlation_id, request_key):
+ def _encode_message_header(cls, client_id, correlation_id, request_key,
+ version=0):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
- 0, # ApiVersion
+ version, # ApiVersion
correlation_id, # CorrelationId
len(client_id), # ClientId size
client_id) # ClientId
@@ -430,6 +433,38 @@ class KafkaProtocol(object):
return MetadataResponse(brokers, topic_metadata)
@classmethod
+ def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
+ """
+ Encode a ConsumerMetadataRequest
+
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: string (consumer group)
+ """
+ message = []
+ message.append(cls._encode_message_header(client_id, correlation_id,
+ KafkaProtocol.CONSUMER_METADATA_KEY))
+ message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
+
+ msg = b''.join(message)
+ return write_int_string(msg)
+
+ @classmethod
+ def decode_consumer_metadata_response(cls, data):
+ """
+ Decode bytes to a ConsumerMetadataResponse
+
+ Arguments:
+ data: bytes to decode
+ """
+ ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
+ (host, cur) = read_short_string(data, cur)
+ ((port,), cur) = relative_unpack('>i', data, cur)
+
+ return ConsumerMetadataResponse(error, nodeId, host, port)
+
+ @classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,
group, payloads):
"""
@@ -481,21 +516,27 @@ class KafkaProtocol(object):
@classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id,
- group, payloads):
+ group, payloads, from_kafka=False):
"""
- Encode some OffsetFetchRequest structs
+ Encode some OffsetFetchRequest structs. The request is encoded using
+ version 0 if from_kafka is false, indicating a request for Zookeeper
+ offsets. It is encoded using version 1 otherwise, indicating a request
+ for Kafka offsets.
Arguments:
client_id: string
correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
+ from_kafka: bool, default False, set True for Kafka-committed offsets
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
+ reqver = 1 if from_kafka else 0
message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_FETCH_KEY))
+ KafkaProtocol.OFFSET_FETCH_KEY,
+ version=reqver))
message.append(write_short_string(group))
message.append(struct.pack('>i', len(grouped_payloads)))