diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:15:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:17:43 -0700 |
commit | fff812ddc80243208233f785b3f005904cf33482 (patch) | |
tree | 30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka/protocol.py | |
parent | 42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff) | |
parent | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff) | |
download | kafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz |
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor
* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
MetadataResponse object so that it is consistent with server api:
[broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
PartitionUnavailable) with server standard exceptions
(LeaderNotAvailableError, UnknownTopicOrPartitionError)
Conflicts:
kafka/client.py
test/test_client.py
test/test_producer_integration.py
test/test_protocol.py
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 45 |
1 files changed, 25 insertions, 20 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index e5356c5..9e01f5a 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -9,11 +9,12 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, - ProduceResponse, FetchResponse, OffsetResponse, - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, - BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall, - UnsupportedCodecError + Message, OffsetAndMessage, TopicAndPartition, + BrokerMetadata, TopicMetadata, PartitionMetadata, + MetadataResponse, ProduceResponse, FetchResponse, + OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, + ProtocolError, BufferUnderflowError, ChecksumError, + ConsumerFetchSizeTooSmall, UnsupportedCodecError ) from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, @@ -343,7 +344,8 @@ class KafkaProtocol(object): yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod - def encode_metadata_request(cls, client_id, correlation_id, topics=None): + def encode_metadata_request(cls, client_id, correlation_id, topics=None, + payloads=None): """ Encode a MetadataRequest @@ -353,7 +355,11 @@ class KafkaProtocol(object): correlation_id: int topics: list of strings """ - topics = [] if topics is None else topics + if payloads is None: + topics = [] if topics is None else topics + else: + topics = payloads + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) @@ -376,28 +382,24 @@ class KafkaProtocol(object): ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) # Broker info - brokers = {} + brokers = [] for i in range(numbrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) - brokers[nodeId] = BrokerMetadata(nodeId, host, port) + brokers.append(BrokerMetadata(nodeId, host, port)) # Topic info ((num_topics,), cur) = relative_unpack('>i', data, cur) - topic_metadata = {} + topic_metadata = [] for i in range(num_topics): - # NOTE: topic_error is discarded. Should probably be returned with - # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - partition_metadata = {} + partition_metadata = [] for j in range(num_partitions): - # NOTE: partition_error_code is discarded. Should probably be - # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -407,13 +409,16 @@ class KafkaProtocol(object): ((num_isr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) - partition_metadata[partition] = \ - PartitionMetadata( - topic_name, partition, leader, replicas, isr) + partition_metadata.append( + PartitionMetadata(topic_name, partition, leader, + replicas, isr, partition_error_code) + ) - topic_metadata[topic_name] = partition_metadata + topic_metadata.append( + TopicMetadata(topic_name, topic_error, partition_metadata) + ) - return brokers, topic_metadata + return MetadataResponse(brokers, topic_metadata) @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, |