From 18ac14860791db2382c3e62715f11a6f657f265a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Sep 2014 01:48:18 -0700 Subject: Improve metadata protocol handling - add MetadataRequest and MetadataResponse namedtuples - add TopicMetadata namedtuple - add error codes to Topic and Partition Metadata - add KafkaClient.send_metadata_request() method - KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list] --- kafka/common.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) (limited to 'kafka/common.py') diff --git a/kafka/common.py b/kafka/common.py index 907e128..e1713cf 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest", OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) -BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", - "replicas", "isr"]) # Other useful structs -OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) -Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +TopicMetadata = namedtuple("TopicMetadata", + ["topic", "error", "partitions"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +TopicAndPartition = namedtuple("TopicAndPartition", + ["topic", "partition"]) ################# -- cgit v1.2.1 From 6c0d9137eae9bd875a7a7ab70d3285dd74f6701b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Sep 2014 02:25:05 -0700 Subject: if error code is not recognized in check_error: raise UnknownError --- kafka/common.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'kafka/common.py') diff --git a/kafka/common.py b/kafka/common.py index e1713cf..6b73410 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -76,6 +76,9 @@ class KafkaError(RuntimeError): class BrokerResponseError(KafkaError): pass +class NoError(BrokerResponseError): + errno = 0 + message = 'SUCCESS' class UnknownError(BrokerResponseError): errno = -1 @@ -197,6 +200,7 @@ class UnsupportedCodecError(KafkaError): kafka_errors = { -1 : UnknownError, + 0 : NoError, 1 : OffsetOutOfRangeError, 2 : InvalidMessageError, 3 : UnknownTopicOrPartitionError, @@ -214,7 +218,7 @@ kafka_errors = { def check_error(response): - error = kafka_errors.get(response.error) - if error: + error = kafka_errors.get(response.error, UnknownError) + if error is not NoError: raise error(response) -- cgit v1.2.1 From bebe7b663894c96d407b3b65725c8779c3b3af4d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Sep 2014 02:28:13 -0700 Subject: No more custom LeaderUnavailableError exception -- raise LeaderNotAvailableError --- kafka/common.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'kafka/common.py') diff --git a/kafka/common.py b/kafka/common.py index 6b73410..e8fa31e 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -158,10 +158,6 @@ class KafkaTimeoutError(KafkaError): pass -class LeaderUnavailableError(KafkaError): - pass - - class PartitionUnavailableError(KafkaError): pass -- cgit v1.2.1 From 945ecbcee7d2844ebbfa407b1542109fd8518cde Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Sep 2014 17:03:46 -0700 Subject: Use standard exceptions in client._get_leader_for_partition() - drop custom PartitionUnavailable exception - raise UnknownTopicOrPartitionError or LeaderNotAvailableError - add tests for exception raises --- kafka/common.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'kafka/common.py') diff --git a/kafka/common.py b/kafka/common.py index e8fa31e..008736c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -158,10 +158,6 @@ class KafkaTimeoutError(KafkaError): pass -class PartitionUnavailableError(KafkaError): - pass - - class FailedPayloadsError(KafkaError): pass -- cgit v1.2.1