diff options
author | tpalino <tpalino@linkedin.com> | 2015-07-01 22:29:56 -0700 |
---|---|---|
committer | tpalino <tpalino@linkedin.com> | 2015-07-01 22:29:56 -0700 |
commit | c01963682e1ec16dbde9b55cda4ec95ad2346476 (patch) | |
tree | b4de368ad49a6d0e23be9ff7f3af64d1f3c4f890 /kafka/protocol.py | |
parent | adbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (diff) | |
download | kafka-python-c01963682e1ec16dbde9b55cda4ec95ad2346476.tar.gz |
Support consumer metadata requests
Support added for ConsumerMetadataRequest and ConsumerMetadataResponse
Added consumer-aware request routine for supporting the consumer coordinator
Added separate client method for fetching Kafka-committed offsets from the coordinator
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 53 |
1 files changed, 47 insertions, 6 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index d5adf89..1f3ea2f 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -14,7 +14,8 @@ from kafka.common import ( MetadataResponse, ProduceResponse, FetchResponse, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProtocolError, BufferUnderflowError, ChecksumError, - ConsumerFetchSizeTooSmall, UnsupportedCodecError + ConsumerFetchSizeTooSmall, UnsupportedCodecError, + ConsumerMetadataResponse ) from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, @@ -43,19 +44,21 @@ class KafkaProtocol(object): METADATA_KEY = 3 OFFSET_COMMIT_KEY = 8 OFFSET_FETCH_KEY = 9 + CONSUMER_METADATA_KEY = 10 ################### # Private API # ################### @classmethod - def _encode_message_header(cls, client_id, correlation_id, request_key): + def _encode_message_header(cls, client_id, correlation_id, request_key, + version=0): """ Encode the common request envelope """ return struct.pack('>hhih%ds' % len(client_id), request_key, # ApiKey - 0, # ApiVersion + version, # ApiVersion correlation_id, # CorrelationId len(client_id), # ClientId size client_id) # ClientId @@ -430,6 +433,38 @@ class KafkaProtocol(object): return MetadataResponse(brokers, topic_metadata) @classmethod + def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): + """ + Encode a ConsumerMetadataRequest + + Arguments: + client_id: string + correlation_id: int + payloads: string (consumer group) + """ + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.CONSUMER_METADATA_KEY)) + message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads)) + + msg = b''.join(message) + return write_int_string(msg) + + @classmethod + def decode_consumer_metadata_response(cls, data): + """ + Decode bytes to a ConsumerMetadataResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + + return ConsumerMetadataResponse(error, nodeId, host, port) + + @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads): """ @@ -481,21 +516,27 @@ class KafkaProtocol(object): @classmethod def encode_offset_fetch_request(cls, client_id, correlation_id, - group, payloads): + group, payloads, from_kafka=False): """ - Encode some OffsetFetchRequest structs + Encode some OffsetFetchRequest structs. The request is encoded using + version 0 if from_kafka is false, indicating a request for Zookeeper + offsets. It is encoded using version 1 otherwise, indicating a request + for Kafka offsets. Arguments: client_id: string correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest + from_kafka: bool, default False, set True for Kafka-committed offsets """ grouped_payloads = group_by_topic_and_partition(payloads) message = [] + reqver = 1 if from_kafka else 0 message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_FETCH_KEY)) + KafkaProtocol.OFFSET_FETCH_KEY, + version=reqver)) message.append(write_short_string(group)) message.append(struct.pack('>i', len(grouped_payloads))) |