summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-30 17:39:39 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-30 17:39:39 -0700
commit0d57c2718fcf3819f2c18911126f245e9e9ce3e0 (patch)
tree378fd3c881fccdec3b284c6a356e44dfe2e66e60 /kafka
parent57913f9f914a959f52bc9040a172f8c9ff77e491 (diff)
downloadkafka-python-0d57c2718fcf3819f2c18911126f245e9e9ce3e0.tar.gz
Make BrokerRequestError a base class, make subclasses for each broker error
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py36
-rw-r--r--kafka/common.py114
-rw-r--r--kafka/consumer.py16
3 files changed, 113 insertions, 53 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 65914a4..4870ab9 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,15 +1,18 @@
import copy
import logging
+import collections
+
+import kafka.common
-from collections import defaultdict
from functools import partial
from itertools import count
+from kafka.common import *
-from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
+from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
- BrokerResponseError, PartitionUnavailableError,
- LeaderUnavailableError,
- KafkaUnavailableError)
+ PartitionUnavailableError,
+ LeaderUnavailableError, KafkaUnavailableError,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -39,6 +42,7 @@ class KafkaClient(object):
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata
+
##################
# Private API #
##################
@@ -92,10 +96,9 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
- except Exception, e:
+ except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
- continue
raise KafkaUnavailableError("All servers failed to process request")
@@ -123,7 +126,7 @@ class KafkaClient(object):
# Group the requests by topic+partition
original_keys = []
- payloads_by_broker = defaultdict(list)
+ payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
@@ -157,11 +160,11 @@ class KafkaClient(object):
continue
try:
response = conn.recv(requestId)
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
@@ -184,16 +187,11 @@ class KafkaClient(object):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
- if resp.error == ErrorMapping.NO_ERROR:
- return
-
- if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
- ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ try:
+ kafka.common.check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)
-
- raise BrokerResponseError(
- "Request for %s failed with errorcode=%d (%s)" %
- (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
+ raise
#################
# Public API #
diff --git a/kafka/common.py b/kafka/common.py
index 830e34d..d288b89 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
-ErrorStrings = {
- -1 : 'UNKNOWN',
- 0 : 'NO_ERROR',
- 1 : 'OFFSET_OUT_OF_RANGE',
- 2 : 'INVALID_MESSAGE',
- 3 : 'UNKNOWN_TOPIC_OR_PARTITON',
- 4 : 'INVALID_FETCH_SIZE',
- 5 : 'LEADER_NOT_AVAILABLE',
- 6 : 'NOT_LEADER_FOR_PARTITION',
- 7 : 'REQUEST_TIMED_OUT',
- 8 : 'BROKER_NOT_AVAILABLE',
- 9 : 'REPLICA_NOT_AVAILABLE',
- 10 : 'MESSAGE_SIZE_TOO_LARGE',
- 11 : 'STALE_CONTROLLER_EPOCH',
- 12 : 'OFFSET_METADATA_TOO_LARGE',
-}
-
-class ErrorMapping(object):
- pass
-
-for k, v in ErrorStrings.items():
- setattr(ErrorMapping, v, k)
-
#################
# Exceptions #
#################
@@ -80,11 +57,76 @@ class KafkaError(RuntimeError):
pass
-class KafkaUnavailableError(KafkaError):
+class BrokerResponseError(KafkaError):
pass
-class BrokerResponseError(KafkaError):
+class UnknownError(BrokerResponseError):
+ errno = -1
+ message = 'UNKNOWN'
+
+
+class OffsetOutOfRangeError(BrokerResponseError):
+ errno = 1
+ message = 'OFFSET_OUT_OF_RANGE'
+
+
+class InvalidMessageError(BrokerResponseError):
+ errno = 2
+ message = 'INVALID_MESSAGE'
+
+
+class UnknownTopicOrPartitionError(BrokerResponseError):
+ errno = 3
+ message = 'UNKNOWN_TOPIC_OR_PARTITON'
+
+
+class InvalidFetchRequestError(BrokerResponseError):
+ errno = 4
+ message = 'INVALID_FETCH_SIZE'
+
+
+class LeaderNotAvailableError(BrokerResponseError):
+ errno = 5
+ message = 'LEADER_NOT_AVAILABLE'
+
+
+class NotLeaderForPartitionError(BrokerResponseError):
+ errno = 6
+ message = 'NOT_LEADER_FOR_PARTITION'
+
+
+class RequestTimedOutError(BrokerResponseError):
+ errno = 7
+ message = 'REQUEST_TIMED_OUT'
+
+
+class BrokerNotAvailableError(BrokerResponseError):
+ errno = 8
+ message = 'BROKER_NOT_AVAILABLE'
+
+
+class ReplicaNotAvailableError(BrokerResponseError):
+ errno = 9
+ message = 'REPLICA_NOT_AVAILABLE'
+
+
+class MessageSizeTooLargeError(BrokerResponseError):
+ errno = 10
+ message = 'MESSAGE_SIZE_TOO_LARGE'
+
+
+class StaleControllerEpochError(BrokerResponseError):
+ errno = 11
+ message = 'STALE_CONTROLLER_EPOCH'
+
+
+class OffsetMetadataTooLarge(BrokerResponseError):
+ errno = 12
+ message = 'OFFSET_METADATA_TOO_LARGE'
+
+
+class KafkaUnavailableError(KafkaError):
pass
@@ -122,3 +164,25 @@ class ConsumerNoMoreData(KafkaError):
class ProtocolError(KafkaError):
pass
+
+kafka_errors = {
+ -1 : UnknownError,
+ 1 : OffsetOutOfRangeError,
+ 2 : InvalidMessageError,
+ 3 : UnknownTopicOrPartitionError,
+ 4 : InvalidFetchRequestError,
+ 5 : LeaderNotAvailableError,
+ 6 : NotLeaderForPartitionError,
+ 7 : RequestTimedOutError,
+ 8 : BrokerNotAvailableError,
+ 9 : ReplicaNotAvailableError,
+ 10 : MessageSizeTooLargeError,
+ 11 : StaleControllerEpochError,
+ 12 : OffsetMetadataTooLarge,
+}
+
+def check_error(response):
+ error = kafka_errors.get(response.error)
+ if error:
+ raise error(response)
+
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 98f18a0..43b8797 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -8,8 +8,9 @@ from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
+import kafka
from kafka.common import (
- ErrorMapping, FetchRequest,
+ FetchRequest,
OffsetRequest, OffsetCommitRequest,
OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
@@ -100,14 +101,11 @@ class Consumer(object):
self.commit_timer.start()
def get_or_init_offset_callback(resp):
- if resp.error == ErrorMapping.NO_ERROR:
+ try:
+ kafka.common.check_error(resp)
return resp.offset
- elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ except kafka.common.UnknownTopicOrPartitionError:
return 0
- else:
- raise ProtocolError("OffsetFetchRequest for topic=%s, "
- "partition=%d failed with errorcode=%s" % (
- resp.topic, resp.partition, resp.error))
if auto_commit:
for partition in partitions:
@@ -432,7 +430,7 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall, e:
+ except ConsumerFetchSizeTooSmall as e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
@@ -446,7 +444,7 @@ class SimpleConsumer(Consumer):
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
- except ConsumerNoMoreData, e:
+ except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition