summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-05-07 00:04:04 -0700
committerDana Powers <dana.powers@gmail.com>2014-05-07 00:04:04 -0700
commitb47bf781eb0e96c3fef59cbe554325155062e129 (patch)
treec74db904c37ba1e70dab6dd2c78f4c1a4abd173f /kafka
parent3b18043821f37242bde2b186684fa05d36c61921 (diff)
parentb81bf5f69e24b0d0106693b6e47906669873ec18 (diff)
downloadkafka-python-b47bf781eb0e96c3fef59cbe554325155062e129.tar.gz
Merge pull request #158 from wizzat/add_tests
Improve Tests, fix connection error timeout, other issues
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py55
-rw-r--r--kafka/common.py124
-rw-r--r--kafka/consumer.py47
-rw-r--r--kafka/partitioner.py1
-rw-r--r--kafka/protocol.py9
-rw-r--r--kafka/util.py8
6 files changed, 156 insertions, 88 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 5dee7b7..d0e07d0 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,15 +1,16 @@
import copy
import logging
+import collections
+
+import kafka.common
-from collections import defaultdict
from functools import partial
from itertools import count
-
-from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
+from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
- BrokerResponseError, PartitionUnavailableError,
- LeaderUnavailableError,
- KafkaUnavailableError)
+ PartitionUnavailableError,
+ LeaderUnavailableError, KafkaUnavailableError,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -39,29 +40,23 @@ class KafkaClient(object):
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata
+
##################
# Private API #
##################
def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
-
host_key = (host, port)
if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout)
+ self.conns[host_key] = KafkaConnection(
+ host,
+ port,
+ timeout=self.timeout
+ )
return self.conns[host_key]
- def _get_conn_for_broker(self, broker):
- """
- Get or create a connection to a broker
- """
- if (broker.host, broker.port) not in self.conns:
- self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, timeout=self.timeout)
-
- return self._get_conn(broker.host, broker.port)
-
def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
@@ -99,10 +94,9 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
- except Exception, e:
+ except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
- continue
raise KafkaUnavailableError("All servers failed to process request")
@@ -130,7 +124,7 @@ class KafkaClient(object):
# Group the requests by topic+partition
original_keys = []
- payloads_by_broker = defaultdict(list)
+ payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
@@ -151,7 +145,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self._get_conn_for_broker(broker)
+ conn = self._get_conn(broker.host, broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -164,11 +158,11 @@ class KafkaClient(object):
continue
try:
response = conn.recv(requestId)
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
@@ -191,16 +185,11 @@ class KafkaClient(object):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
- if resp.error == ErrorMapping.NO_ERROR:
- return
-
- if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
- ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ try:
+ kafka.common.check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)
-
- raise BrokerResponseError(
- "Request for %s failed with errorcode=%d (%s)" %
- (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
+ raise
#################
# Public API #
diff --git a/kafka/common.py b/kafka/common.py
index 005e6dd..d515532 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
-ErrorStrings = {
- -1 : 'UNKNOWN',
- 0 : 'NO_ERROR',
- 1 : 'OFFSET_OUT_OF_RANGE',
- 2 : 'INVALID_MESSAGE',
- 3 : 'UNKNOWN_TOPIC_OR_PARTITON',
- 4 : 'INVALID_FETCH_SIZE',
- 5 : 'LEADER_NOT_AVAILABLE',
- 6 : 'NOT_LEADER_FOR_PARTITION',
- 7 : 'REQUEST_TIMED_OUT',
- 8 : 'BROKER_NOT_AVAILABLE',
- 9 : 'REPLICA_NOT_AVAILABLE',
- 10 : 'MESSAGE_SIZE_TOO_LARGE',
- 11 : 'STALE_CONTROLLER_EPOCH',
- 12 : 'OFFSET_METADATA_TOO_LARGE',
-}
-
-class ErrorMapping(object):
- pass
-
-for k, v in ErrorStrings.items():
- setattr(ErrorMapping, v, k)
-
#################
# Exceptions #
#################
@@ -80,11 +57,81 @@ class KafkaError(RuntimeError):
pass
-class KafkaUnavailableError(KafkaError):
+class BrokerResponseError(KafkaError):
pass
-class BrokerResponseError(KafkaError):
+class UnknownError(BrokerResponseError):
+ errno = -1
+ message = 'UNKNOWN'
+
+
+class OffsetOutOfRangeError(BrokerResponseError):
+ errno = 1
+ message = 'OFFSET_OUT_OF_RANGE'
+
+
+class InvalidMessageError(BrokerResponseError):
+ errno = 2
+ message = 'INVALID_MESSAGE'
+
+
+class UnknownTopicOrPartitionError(BrokerResponseError):
+ errno = 3
+ message = 'UNKNOWN_TOPIC_OR_PARTITON'
+
+
+class InvalidFetchRequestError(BrokerResponseError):
+ errno = 4
+ message = 'INVALID_FETCH_SIZE'
+
+
+class LeaderNotAvailableError(BrokerResponseError):
+ errno = 5
+ message = 'LEADER_NOT_AVAILABLE'
+
+
+class NotLeaderForPartitionError(BrokerResponseError):
+ errno = 6
+ message = 'NOT_LEADER_FOR_PARTITION'
+
+
+class RequestTimedOutError(BrokerResponseError):
+ errno = 7
+ message = 'REQUEST_TIMED_OUT'
+
+
+class BrokerNotAvailableError(BrokerResponseError):
+ errno = 8
+ message = 'BROKER_NOT_AVAILABLE'
+
+
+class ReplicaNotAvailableError(BrokerResponseError):
+ errno = 9
+ message = 'REPLICA_NOT_AVAILABLE'
+
+
+class MessageSizeTooLargeError(BrokerResponseError):
+ errno = 10
+ message = 'MESSAGE_SIZE_TOO_LARGE'
+
+
+class StaleControllerEpochError(BrokerResponseError):
+ errno = 11
+ message = 'STALE_CONTROLLER_EPOCH'
+
+
+class OffsetMetadataTooLargeError(BrokerResponseError):
+ errno = 12
+ message = 'OFFSET_METADATA_TOO_LARGE'
+
+
+class StaleLeaderEpochCodeError(BrokerResponseError):
+ errno = 13
+ message = 'STALE_LEADER_EPOCH_CODE'
+
+
+class KafkaUnavailableError(KafkaError):
pass
@@ -118,3 +165,30 @@ class ConsumerFetchSizeTooSmall(KafkaError):
class ConsumerNoMoreData(KafkaError):
pass
+
+
+class ProtocolError(KafkaError):
+ pass
+
+kafka_errors = {
+ -1 : UnknownError,
+ 1 : OffsetOutOfRangeError,
+ 2 : InvalidMessageError,
+ 3 : UnknownTopicOrPartitionError,
+ 4 : InvalidFetchRequestError,
+ 5 : LeaderNotAvailableError,
+ 6 : NotLeaderForPartitionError,
+ 7 : RequestTimedOutError,
+ 8 : BrokerNotAvailableError,
+ 9 : ReplicaNotAvailableError,
+ 10 : MessageSizeTooLargeError,
+ 11 : StaleControllerEpochError,
+ 12 : OffsetMetadataTooLargeError,
+ 13 : StaleLeaderEpochCodeError,
+}
+
+def check_error(response):
+ error = kafka_errors.get(response.error)
+ if error:
+ raise error(response)
+
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 8ac28da..ef8fbda 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -3,13 +3,16 @@ from __future__ import absolute_import
from itertools import izip_longest, repeat
import logging
import time
+import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
+import kafka
from kafka.common import (
- ErrorMapping, FetchRequest,
+ FetchRequest,
OffsetRequest, OffsetCommitRequest,
+ OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -80,6 +83,8 @@ class Consumer(object):
if not partitions:
partitions = self.client.topic_partitions[topic]
+ else:
+ assert all(isinstance(x, numbers.Integral) for x in partitions)
# Variables for handling offset commits
self.commit_lock = Lock()
@@ -96,26 +101,22 @@ class Consumer(object):
self.commit_timer.start()
def get_or_init_offset_callback(resp):
- if resp.error == ErrorMapping.NO_ERROR:
+ try:
+ kafka.common.check_error(resp)
return resp.offset
- elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ except kafka.common.UnknownTopicOrPartitionError:
return 0
- else:
- raise Exception("OffsetFetchRequest for topic=%s, "
- "partition=%d failed with errorcode=%s" % (
- resp.topic, resp.partition, resp.error))
-
- # Uncomment for 0.8.1
- #
- #for partition in partitions:
- # req = OffsetFetchRequest(topic, partition)
- # (offset,) = self.client.send_offset_fetch_request(group, [req],
- # callback=get_or_init_offset_callback,
- # fail_on_error=False)
- # self.offsets[partition] = offset
- for partition in partitions:
- self.offsets[partition] = 0
+ if auto_commit:
+ for partition in partitions:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
def commit(self, partitions=None):
"""
@@ -151,7 +152,7 @@ class Consumer(object):
resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
- assert resp.error == 0
+ kafka.common.check_error(resp)
self.count_since_commit = 0
@@ -164,7 +165,7 @@ class Consumer(object):
if not self.auto_commit or self.auto_commit_every_n is None:
return
- if self.count_since_commit > self.auto_commit_every_n:
+ if self.count_since_commit >= self.auto_commit_every_n:
self.commit()
def stop(self):
@@ -429,12 +430,12 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall, e:
+ except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
- raise e
+ raise
if self.max_buffer_size is None:
self.buffer_size *= 2
else:
@@ -443,7 +444,7 @@ class SimpleConsumer(Consumer):
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
- except ConsumerNoMoreData, e:
+ except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition
diff --git a/kafka/partitioner.py b/kafka/partitioner.py
index 8190c34..5287cef 100644
--- a/kafka/partitioner.py
+++ b/kafka/partitioner.py
@@ -54,4 +54,5 @@ class HashedPartitioner(Partitioner):
def partition(self, key, partitions):
size = len(partitions)
idx = hash(key) % size
+
return partitions[idx]
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 25be023..7ec7946 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -8,7 +8,7 @@ from kafka.codec import (
from kafka.common import (
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
ProduceResponse, FetchResponse, OffsetResponse,
- OffsetCommitResponse, OffsetFetchResponse,
+ OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
)
from kafka.util import (
@@ -50,7 +50,7 @@ class KafkaProtocol(object):
request_key, # ApiKey
0, # ApiVersion
correlation_id, # CorrelationId
- len(client_id),
+ len(client_id), # ClientId size
client_id) # ClientId
@classmethod
@@ -68,8 +68,7 @@ class KafkaProtocol(object):
message_set = ""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
- message_set += struct.pack('>qi%ds' % len(encoded_message), 0,
- len(encoded_message), encoded_message)
+ message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
return message_set
@classmethod
@@ -96,7 +95,7 @@ class KafkaProtocol(object):
crc = zlib.crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
- raise Exception("Unexpected magic number: %d" % message.magic)
+ raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
@classmethod
diff --git a/kafka/util.py b/kafka/util.py
index 54052fb..a918234 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,5 +1,6 @@
-from collections import defaultdict
+import collections
import struct
+import sys
from threading import Thread, Event
from kafka.common import BufferUnderflowError
@@ -15,6 +16,9 @@ def write_int_string(s):
def write_short_string(s):
if s is None:
return struct.pack('>h', -1)
+ elif len(s) > 32767 and sys.version < (2,7):
+ # Python 2.6 issues a deprecation warning instead of a struct error
+ raise struct.error(len(s))
else:
return struct.pack('>h%ds' % len(s), len(s), s)
@@ -63,7 +67,7 @@ def relative_unpack(fmt, data, cur):
def group_by_topic_and_partition(tuples):
- out = defaultdict(dict)
+ out = collections.defaultdict(dict)
for t in tuples:
out[t.topic][t.partition] = t
return out