summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/common.py36
-rw-r--r--kafka/conn.py19
3 files changed, 35 insertions, 26 deletions
diff --git a/kafka/client.py b/kafka/client.py
index ab0eb8d..39c89ba 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -5,7 +5,7 @@ from collections import defaultdict
from functools import partial
from itertools import count
-from kafka.common import (ErrorMapping, TopicAndPartition,
+from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
@@ -199,8 +199,8 @@ class KafkaClient(object):
self.reset_topic_metadata(resp.topic)
raise BrokerResponseError(
- "Request for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition), resp.error))
+ "Request for %s failed with errorcode=%d (%s)" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
#################
# Public API #
diff --git a/kafka/common.py b/kafka/common.py
index b4fe5c7..005e6dd 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -48,22 +48,28 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
+ErrorStrings = {
+ -1 : 'UNKNOWN',
+ 0 : 'NO_ERROR',
+ 1 : 'OFFSET_OUT_OF_RANGE',
+ 2 : 'INVALID_MESSAGE',
+ 3 : 'UNKNOWN_TOPIC_OR_PARTITON',
+ 4 : 'INVALID_FETCH_SIZE',
+ 5 : 'LEADER_NOT_AVAILABLE',
+ 6 : 'NOT_LEADER_FOR_PARTITION',
+ 7 : 'REQUEST_TIMED_OUT',
+ 8 : 'BROKER_NOT_AVAILABLE',
+ 9 : 'REPLICA_NOT_AVAILABLE',
+ 10 : 'MESSAGE_SIZE_TOO_LARGE',
+ 11 : 'STALE_CONTROLLER_EPOCH',
+ 12 : 'OFFSET_METADATA_TOO_LARGE',
+}
+
class ErrorMapping(object):
- # Many of these are not actually used by the client
- UNKNOWN = -1
- NO_ERROR = 0
- OFFSET_OUT_OF_RANGE = 1
- INVALID_MESSAGE = 2
- UNKNOWN_TOPIC_OR_PARTITON = 3
- INVALID_FETCH_SIZE = 4
- LEADER_NOT_AVAILABLE = 5
- NOT_LEADER_FOR_PARTITION = 6
- REQUEST_TIMED_OUT = 7
- BROKER_NOT_AVAILABLE = 8
- REPLICA_NOT_AVAILABLE = 9
- MESSAGE_SIZE_TO_LARGE = 10
- STALE_CONTROLLER_EPOCH = 11
- OFFSET_METADATA_TOO_LARGE = 12
+ pass
+
+for k, v in ErrorStrings.items():
+ setattr(ErrorMapping, v, k)
#################
# Exceptions #
diff --git a/kafka/conn.py b/kafka/conn.py
index 7538e8d..4fdeb17 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -54,11 +54,10 @@ class KafkaConnection(local):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((host, port))
self.timeout = timeout
- self._sock.settimeout(self.timeout)
- self._dirty = False
+ self._sock = None
+
+ self.reinit()
def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -73,24 +72,28 @@ class KafkaConnection(local):
def _read_bytes(self, num_bytes):
bytes_left = num_bytes
- resp = ''
+ responses = []
+
log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
self.reinit()
+
while bytes_left:
try:
- data = self._sock.recv(bytes_left)
+ data = self._sock.recv(min(bytes_left, 4096))
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
+
if data == '':
log.error("Not enough data to read this response")
self._raise_connection_error()
+
bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
- resp += data
+ responses.append(data)
- return resp
+ return ''.join(responses)
##################
# Public API #