diff options
Diffstat (limited to 'kafka/common.py')
-rw-r--r-- | kafka/common.py | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/kafka/common.py b/kafka/common.py index 907e128..008736c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest", OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) -BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", - "replicas", "isr"]) # Other useful structs -OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) -Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +TopicMetadata = namedtuple("TopicMetadata", + ["topic", "error", "partitions"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +TopicAndPartition = namedtuple("TopicAndPartition", + ["topic", "partition"]) ################# @@ -60,6 +76,9 @@ class KafkaError(RuntimeError): class BrokerResponseError(KafkaError): pass +class NoError(BrokerResponseError): + errno = 0 + message = 'SUCCESS' class UnknownError(BrokerResponseError): errno = -1 @@ -139,14 +158,6 @@ class KafkaTimeoutError(KafkaError): pass -class LeaderUnavailableError(KafkaError): - pass - - -class PartitionUnavailableError(KafkaError): - pass - - class FailedPayloadsError(KafkaError): pass @@ -181,6 +192,7 @@ class UnsupportedCodecError(KafkaError): kafka_errors = { -1 : UnknownError, + 0 : NoError, 1 : OffsetOutOfRangeError, 2 : InvalidMessageError, 3 : UnknownTopicOrPartitionError, @@ -198,7 +210,7 @@ kafka_errors = { def check_error(response): - error = kafka_errors.get(response.error) - if error: + error = kafka_errors.get(response.error, UnknownError) + if error is not NoError: raise error(response) |