summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-08 13:15:56 -0700
committerDana Powers <dana.powers@rd.io>2014-09-08 13:17:43 -0700
commitfff812ddc80243208233f785b3f005904cf33482 (patch)
tree30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka/common.py
parent42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff)
parent0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff)
downloadkafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor * add MetadataRequest and MetadataResponse namedtuples * add TopicMetadata namedtuple * add error codes to Topic and Partition Metadata * add KafkaClient.send_metadata_request() method * KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list] * raise server exceptions in load_metadata_for_topics(*topics) unless topics is null (full refresh) * Replace non-standard exceptions (LeaderUnavailable, PartitionUnavailable) with server standard exceptions (LeaderNotAvailableError, UnknownTopicOrPartitionError) Conflicts: kafka/client.py test/test_client.py test/test_producer_integration.py test/test_protocol.py
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py46
1 files changed, 29 insertions, 17 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 907e128..008736c 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest",
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
+MetadataRequest = namedtuple("MetadataRequest",
+ ["topics"])
+
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
+MetadataResponse = namedtuple("MetadataResponse",
+ ["brokers", "topics"])
+
# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
@@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset",
"metadata", "error"])
-BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
-PartitionMetadata = namedtuple("PartitionMetadata",
- ["topic", "partition", "leader",
- "replicas", "isr"])
# Other useful structs
-OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
-Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
-TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
+BrokerMetadata = namedtuple("BrokerMetadata",
+ ["nodeId", "host", "port"])
+
+TopicMetadata = namedtuple("TopicMetadata",
+ ["topic", "error", "partitions"])
+
+PartitionMetadata = namedtuple("PartitionMetadata",
+ ["topic", "partition", "leader", "replicas", "isr", "error"])
+
+OffsetAndMessage = namedtuple("OffsetAndMessage",
+ ["offset", "message"])
+
+Message = namedtuple("Message",
+ ["magic", "attributes", "key", "value"])
+
+TopicAndPartition = namedtuple("TopicAndPartition",
+ ["topic", "partition"])
#################
@@ -60,6 +76,9 @@ class KafkaError(RuntimeError):
class BrokerResponseError(KafkaError):
pass
+class NoError(BrokerResponseError):
+ errno = 0
+ message = 'SUCCESS'
class UnknownError(BrokerResponseError):
errno = -1
@@ -139,14 +158,6 @@ class KafkaTimeoutError(KafkaError):
pass
-class LeaderUnavailableError(KafkaError):
- pass
-
-
-class PartitionUnavailableError(KafkaError):
- pass
-
-
class FailedPayloadsError(KafkaError):
pass
@@ -181,6 +192,7 @@ class UnsupportedCodecError(KafkaError):
kafka_errors = {
-1 : UnknownError,
+ 0 : NoError,
1 : OffsetOutOfRangeError,
2 : InvalidMessageError,
3 : UnknownTopicOrPartitionError,
@@ -198,7 +210,7 @@ kafka_errors = {
def check_error(response):
- error = kafka_errors.get(response.error)
- if error:
+ error = kafka_errors.get(response.error, UnknownError)
+ if error is not NoError:
raise error(response)