diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:20:57 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:23:20 -0800 |
commit | d0de279459a92e787730f5c85a2cf6f2741cbd97 (patch) | |
tree | 7b0c9995742097c40039697f3ac36e016a49bd36 /kafka/protocol/legacy.py | |
parent | 9740b2b88b41726f143b3367285dbc118bfa0a8a (diff) | |
download | kafka-python-d0de279459a92e787730f5c85a2cf6f2741cbd97.tar.gz |
Change KafkaProtocol to encode/decode Structs, not bytes
- add Payload to kafka.common Request/Responses namedtuples
- OffsetFetch and OffsetCommit still need to be converted
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 352 |
1 files changed, 106 insertions, 246 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index db9f3e0..c5babf7 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -7,16 +7,21 @@ import six from six.moves import xrange +import kafka.common +import kafka.protocol.commit +import kafka.protocol.fetch +import kafka.protocol.message +import kafka.protocol.metadata +import kafka.protocol.offset +import kafka.protocol.produce + from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - Message, OffsetAndMessage, TopicAndPartition, - BrokerMetadata, TopicMetadata, PartitionMetadata, - MetadataResponse, ProduceResponse, FetchResponse, - OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, - ProtocolError, BufferUnderflowError, ChecksumError, - ConsumerFetchSizeTooSmall, UnsupportedCodecError, + OffsetCommitResponse, OffsetFetchResponse, + ProtocolError, ChecksumError, + UnsupportedCodecError, ConsumerMetadataResponse ) from kafka.util import ( @@ -115,41 +120,6 @@ class KafkaProtocol(object): return msg @classmethod - def _decode_message_set_iter(cls, data): - """ - Iteratively decode a MessageSet - - Reads repeated elements of (offset, message), calling decode_message - to decode a single message. Since compressed messages contain futher - MessageSets, these two methods have been decoupled so that they may - recurse easily. - """ - cur = 0 - read_message = False - while cur < len(data): - try: - ((offset, ), cur) = relative_unpack('>q', data, cur) - (msg, cur) = read_int_string(data, cur) - for (offset, message) in KafkaProtocol._decode_message(msg, offset): - read_message = True - yield OffsetAndMessage(offset, message) - except BufferUnderflowError: - # NOTE: Not sure this is correct error handling: - # Is it possible to get a BUE if the message set is somewhere - # in the middle of the fetch response? If so, we probably have - # an issue that's not fetch size too small. - # Aren't we ignoring errors if we fail to unpack data by - # raising StopIteration()? - # If _decode_message() raises a ChecksumError, couldn't that - # also be due to the fetch size being too small? - if read_message is False: - # If we get a partial read of a message, but haven't - # yielded anything there's a problem - raise ConsumerFetchSizeTooSmall() - else: - raise StopIteration() - - @classmethod def _decode_message(cls, data, offset): """ Decode a single Message @@ -169,7 +139,7 @@ class KafkaProtocol(object): codec = att & ATTRIBUTE_CODEC_MASK if codec == CODEC_NONE: - yield (offset, Message(magic, att, key, value)) + yield (offset, kafka.common.Message(magic, att, key, value)) elif codec == CODEC_GZIP: gz = gzip_decode(value) @@ -186,253 +156,143 @@ class KafkaProtocol(object): ################## @classmethod - def encode_produce_request(cls, client_id, correlation_id, - payloads=None, acks=1, timeout=1000): + def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): """ - Encode some ProduceRequest structs + Encode a ProduceRequest struct Arguments: - client_id: string - correlation_id: int - payloads: list of ProduceRequest + payloads: list of ProduceRequestPayload acks: How "acky" you want the request to be - 0: immediate response 1: written to disk by the leader - 2+: waits for this many number of replicas to sync + 0: immediate response -1: waits for all replicas to be in sync - timeout: Maximum time the server will wait for acks from replicas. + timeout: Maximum time (in ms) the server will wait for replica acks. This is _not_ a socket timeout - """ - payloads = [] if payloads is None else payloads - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.PRODUCE_KEY)) - - message.append(struct.pack('>hii', acks, timeout, - len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic, - len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - msg_set = KafkaProtocol._encode_message_set(payload.messages) - message.append(struct.pack('>ii%ds' % len(msg_set), partition, - len(msg_set), msg_set)) - - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) + Returns: ProduceRequest + """ + if acks not in (1, 0, -1): + raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) + + return kafka.protocol.produce.ProduceRequest( + required_acks=acks, + timeout=timeout, + topics=[( + topic, + [( + partition, + [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key, + magic=msg.magic, + attributes=msg.attributes)) + for msg in payload.messages]) + for partition, payload in topic_payloads.items()]) + for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) @classmethod - def decode_produce_response(cls, data): + def decode_produce_response(cls, response): """ - Decode bytes to a ProduceResponse + Decode ProduceResponse to ProduceResponsePayload Arguments: - data: bytes to decode + response: ProduceResponse + Return: list of ProduceResponsePayload """ - ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) - - for _ in range(num_topics): - ((strlen,), cur) = relative_unpack('>h', data, cur) - topic = data[cur:cur + strlen] - cur += strlen - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - for _ in range(num_partitions): - ((partition, error, offset), cur) = relative_unpack('>ihq', - data, cur) - - yield ProduceResponse(topic, partition, error, offset) + return [ + kafka.common.ProduceResponsePayload(topic, partition, error, offset) + for topic, partitions in response.topics + for partition, error, offset in partitions + ] @classmethod - def encode_fetch_request(cls, client_id, correlation_id, payloads=None, - max_wait_time=100, min_bytes=4096): + def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096): """ - Encodes some FetchRequest structs + Encodes a FetchRequest struct Arguments: - client_id: string - correlation_id: int - payloads: list of FetchRequest - max_wait_time: int, how long to block waiting on min_bytes of data - min_bytes: int, the minimum number of bytes to accumulate before - returning the response - """ - - payloads = [] if payloads is None else payloads - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.FETCH_KEY)) - - # -1 is the replica id - message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes, - len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>iqi', partition, payload.offset, - payload.max_bytes)) - - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) + payloads: list of FetchRequestPayload + max_wait_time (int, optional): ms to block waiting for min_bytes + data. Defaults to 100. + min_bytes (int, optional): minimum bytes required to return before + max_wait_time. Defaults to 4096. + + Return: FetchRequest + """ + return kafka.protocol.fetch.FetchRequest( + replica_id=-1, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + topics=[( + topic, + [( + partition, + payload.offset, + payload.max_bytes) + for partition, payload in topic_payloads.items()]) + for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) @classmethod - def decode_fetch_response(cls, data): + def decode_fetch_response(cls, response): """ - Decode bytes to a FetchResponse + Decode FetchResponse struct to FetchResponsePayloads Arguments: - data: bytes to decode + response: FetchResponse """ - ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) - - for _ in range(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for j in range(num_partitions): - ((partition, error, highwater_mark_offset), cur) = \ - relative_unpack('>ihq', data, cur) - - (message_set, cur) = read_int_string(data, cur) - - yield FetchResponse( - topic, partition, error, - highwater_mark_offset, - KafkaProtocol._decode_message_set_iter(message_set)) + return [ + kafka.common.FetchResponsePayload( + topic, partition, error, highwater_offset, [ + kafka.common.OffsetAndMessage(offset, message) + for offset, _, message in messages]) + for topic, partitions in response.topics + for partition, error, highwater_offset, messages in partitions + ] @classmethod - def encode_offset_request(cls, client_id, correlation_id, payloads=None): - payloads = [] if payloads is None else payloads - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_KEY)) - - # -1 is the replica id - message.append(struct.pack('>ii', -1, len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>iqi', partition, payload.time, - payload.max_offsets)) - - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) + def encode_offset_request(cls, payloads=()): + return kafka.protocol.offset.OffsetRequest( + replica_id=-1, + topics=[( + topic, + [( + partition, + payload.time, + payload.max_offsets) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod - def decode_offset_response(cls, data): + def decode_offset_response(cls, response): """ - Decode bytes to an OffsetResponse + Decode OffsetResponse into OffsetResponsePayloads Arguments: - data: bytes to decode - """ - ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + response: OffsetResponse - for _ in range(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_partitions): - ((partition, error, num_offsets,), cur) = \ - relative_unpack('>ihi', data, cur) - - offsets = [] - for k in range(num_offsets): - ((offset,), cur) = relative_unpack('>q', data, cur) - offsets.append(offset) - - yield OffsetResponse(topic, partition, error, tuple(offsets)) + Returns: list of OffsetResponsePayloads + """ + return [ + kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets)) + for topic, partitions in response.topics + for partition, error, offsets in partitions + ] @classmethod - def encode_metadata_request(cls, client_id, correlation_id, topics=None, - payloads=None): + def encode_metadata_request(cls, topics=(), payloads=None): """ Encode a MetadataRequest Arguments: - client_id: string - correlation_id: int topics: list of strings """ - if payloads is None: - topics = [] if topics is None else topics - else: + if payloads is not None: topics = payloads - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.METADATA_KEY)) - - message.append(struct.pack('>i', len(topics))) - - for topic in topics: - message.append(struct.pack('>h%ds' % len(topic), len(topic), topic)) - - msg = b''.join(message) - return write_int_string(msg) + return kafka.protocol.metadata.MetadataRequest(topics) @classmethod - def decode_metadata_response(cls, data): - """ - Decode bytes to a MetadataResponse - - Arguments: - data: bytes to decode - """ - ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) - - # Broker info - brokers = [] - for _ in range(numbrokers): - ((nodeId, ), cur) = relative_unpack('>i', data, cur) - (host, cur) = read_short_string(data, cur) - ((port,), cur) = relative_unpack('>i', data, cur) - brokers.append(BrokerMetadata(nodeId, host, port)) - - # Topic info - ((num_topics,), cur) = relative_unpack('>i', data, cur) - topic_metadata = [] - - for _ in range(num_topics): - ((topic_error,), cur) = relative_unpack('>h', data, cur) - (topic_name, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - partition_metadata = [] - - for _ in range(num_partitions): - ((partition_error_code, partition, leader, numReplicas), cur) = \ - relative_unpack('>hiii', data, cur) - - (replicas, cur) = relative_unpack( - '>%di' % numReplicas, data, cur) - - ((num_isr,), cur) = relative_unpack('>i', data, cur) - (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) - - partition_metadata.append( - PartitionMetadata(topic_name, partition, leader, - replicas, isr, partition_error_code) - ) - - topic_metadata.append( - TopicMetadata(topic_name, topic_error, partition_metadata) - ) - - return MetadataResponse(brokers, topic_metadata) + def decode_metadata_response(cls, response): + return response @classmethod def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): @@ -587,7 +447,7 @@ def create_message(payload, key=None): key: bytes, a key used for partition routing (optional) """ - return Message(0, 0, key, payload) + return kafka.common.Message(0, 0, key, payload) def create_gzip_message(payloads, key=None, compresslevel=None): @@ -608,7 +468,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None): gzipped = gzip_encode(message_set, compresslevel=compresslevel) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP - return Message(0, 0x00 | codec, key, gzipped) + return kafka.common.Message(0, 0x00 | codec, key, gzipped) def create_snappy_message(payloads, key=None): @@ -629,7 +489,7 @@ def create_snappy_message(payloads, key=None): snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY - return Message(0, 0x00 | codec, key, snapped) + return kafka.common.Message(0, 0x00 | codec, key, snapped) def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): |