diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-10 10:57:27 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-10 11:25:15 -0800 |
commit | 7470cade6bb8629d17541e136527369f9d2ec387 (patch) | |
tree | ba424c4d0cc27ffb5ec77196a0e121f3075bc992 /kafka | |
parent | c3d2fda3c368771cb93a09bb2f1edaa7a3cf9c2b (diff) | |
download | kafka-python-7470cade6bb8629d17541e136527369f9d2ec387.tar.gz |
Convert OffsetCommit and OffsetFetch protocol encode/decode
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/common.py | 16 | ||||
-rw-r--r-- | kafka/consumer/base.py | 6 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 6 | ||||
-rw-r--r-- | kafka/context.py | 4 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 128 |
5 files changed, 62 insertions, 98 deletions
diff --git a/kafka/common.py b/kafka/common.py index 7ae3294..4fc1e19 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -28,30 +28,30 @@ ProduceResponsePayload = namedtuple("ProduceResponsePayload", ["topic", "partition", "error", "offset"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI -FetchRequestPayload = namedtuple("FetchRequest", +FetchRequestPayload = namedtuple("FetchRequestPayload", ["topic", "partition", "offset", "max_bytes"]) -FetchResponsePayload = namedtuple("FetchResponse", +FetchResponsePayload = namedtuple("FetchResponsePayload", ["topic", "partition", "error", "highwaterMark", "messages"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI -OffsetRequestPayload = namedtuple("OffsetRequest", +OffsetRequestPayload = namedtuple("OffsetRequestPayload", ["topic", "partition", "time", "max_offsets"]) -OffsetResponsePayload = namedtuple("OffsetResponse", +OffsetResponsePayload = namedtuple("OffsetResponsePayload", ["topic", "partition", "error", "offsets"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI -OffsetCommitRequest = namedtuple("OffsetCommitRequest", +OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload", ["topic", "partition", "offset", "metadata"]) -OffsetCommitResponse = namedtuple("OffsetCommitResponse", +OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload", ["topic", "partition", "error"]) -OffsetFetchRequest = namedtuple("OffsetFetchRequest", +OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload", ["topic", "partition"]) -OffsetFetchResponse = namedtuple("OffsetFetchResponse", +OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload", ["topic", "partition", "offset", "metadata", "error"]) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 034d35c..5859d36 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, UnknownTopicOrPartitionError, check_error, KafkaError ) @@ -101,7 +101,7 @@ class Consumer(object): responses = self.client.send_offset_fetch_request( self.group, - [OffsetFetchRequest(self.topic, p) for p in partitions], + [OffsetFetchRequestPayload(self.topic, p) for p in partitions], fail_on_error=False ) @@ -155,7 +155,7 @@ class Consumer(object): 'group=%s, topic=%s, partition=%s', offset, self.group, self.topic, partition) - reqs.append(OffsetCommitRequest(self.topic, partition, + reqs.append(OffsetCommitRequestPayload(self.topic, partition, offset, None)) try: diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 1bd3def..fa70124 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -11,7 +11,7 @@ import six from kafka.client import KafkaClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, + OffsetFetchRequestPayload, OffsetCommitRequestPayload, OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, @@ -546,7 +546,7 @@ class KafkaConsumer(object): continue commits.append( - OffsetCommitRequest(topic_partition[0], topic_partition[1], + OffsetCommitRequestPayload(topic_partition[0], topic_partition[1], commit_offset, metadata) ) @@ -618,7 +618,7 @@ class KafkaConsumer(object): for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( kafka_bytestring(self._config['group_id']), - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], fail_on_error=False) try: check_error(resp) diff --git a/kafka/context.py b/kafka/context.py index ade4db8..376fad1 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -3,7 +3,7 @@ Context manager to commit/rollback consumer offsets. """ from logging import getLogger -from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError +from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError class OffsetCommitContext(object): @@ -139,7 +139,7 @@ class OffsetCommitContext(object): self.logger.debug("Committing partition offsets: %s", partition_offsets) commit_requests = [ - OffsetCommitRequest(self.consumer.topic, partition, offset, None) + OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None) for partition, offset in partition_offsets.items() ] commit_responses = self.consumer.client.send_offset_commit_request( diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index c5babf7..feabed3 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -19,7 +19,6 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, ChecksumError, UnsupportedCodecError, ConsumerMetadataResponse @@ -258,8 +257,8 @@ class KafkaProtocol(object): partition, payload.time, payload.max_offsets) - for partition, payload in six.iteritems(topic_payloads)]) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod def decode_offset_response(cls, response): @@ -327,115 +326,80 @@ class KafkaProtocol(object): return ConsumerMetadataResponse(error, nodeId, host, port) @classmethod - def encode_offset_commit_request(cls, client_id, correlation_id, - group, payloads): + def encode_offset_commit_request(cls, group, payloads): """ - Encode some OffsetCommitRequest structs + Encode an OffsetCommitRequest struct Arguments: - client_id: string - correlation_id: int group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + payloads: list of OffsetCommitRequestPayload """ - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_COMMIT_KEY)) - message.append(write_short_string(group)) - message.append(struct.pack('>i', len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>iq', partition, payload.offset)) - message.append(write_short_string(payload.metadata)) + return kafka.protocol.commit.OffsetCommitRequest_v0( + consumer_group=group, + topics=[( + topic, + [( + partition, + payload.offset, + payload.metadata) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod - def decode_offset_commit_response(cls, data): + def decode_offset_commit_response(cls, response): """ - Decode bytes to an OffsetCommitResponse + Decode OffsetCommitResponse to an OffsetCommitResponsePayload Arguments: - data: bytes to decode + response: OffsetCommitResponse """ - ((correlation_id,), cur) = relative_unpack('>i', data, 0) - ((num_topics,), cur) = relative_unpack('>i', data, cur) - - for _ in xrange(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in xrange(num_partitions): - ((partition, error), cur) = relative_unpack('>ih', data, cur) - yield OffsetCommitResponse(topic, partition, error) + return [ + kafka.common.OffsetCommitResponsePayload(topic, partition, error) + for topic, partitions in response.topics + for partition, error in partitions + ] @classmethod - def encode_offset_fetch_request(cls, client_id, correlation_id, - group, payloads, from_kafka=False): + def encode_offset_fetch_request(cls, group, payloads, from_kafka=False): """ - Encode some OffsetFetchRequest structs. The request is encoded using + Encode an OffsetFetchRequest struct. The request is encoded using version 0 if from_kafka is false, indicating a request for Zookeeper offsets. It is encoded using version 1 otherwise, indicating a request for Kafka offsets. Arguments: - client_id: string - correlation_id: int group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + payloads: list of OffsetFetchRequestPayload from_kafka: bool, default False, set True for Kafka-committed offsets """ - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - reqver = 1 if from_kafka else 0 - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_FETCH_KEY, - version=reqver)) - - message.append(write_short_string(group)) - message.append(struct.pack('>i', len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>i', partition)) + if from_kafka: + request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + else: + request_class = kafka.protocol.commit.OffsetFetchRequest_v0 - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) + return request_class( + consumer_group=group, + topics=[( + topic, + list(topic_payloads.keys())) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod - def decode_offset_fetch_response(cls, data): + def decode_offset_fetch_response(cls, response): """ - Decode bytes to an OffsetFetchResponse + Decode OffsetFetchResponse to OffsetFetchResponsePayloads Arguments: - data: bytes to decode + response: OffsetFetchResponse """ - - ((correlation_id,), cur) = relative_unpack('>i', data, 0) - ((num_topics,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_partitions): - ((partition, offset), cur) = relative_unpack('>iq', data, cur) - (metadata, cur) = read_short_string(data, cur) - ((error,), cur) = relative_unpack('>h', data, cur) - - yield OffsetFetchResponse(topic, partition, offset, - metadata, error) + return [ + kafka.common.OffsetFetchResponsePayload( + topic, partition, offset, metadata, error + ) + for topic, partitions in response.topics + for partition, offset, metadata, error in partitions + ] def create_message(payload, key=None): |