diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 55 | ||||
-rw-r--r-- | kafka/common.py | 124 | ||||
-rw-r--r-- | kafka/consumer.py | 47 | ||||
-rw-r--r-- | kafka/partitioner.py | 1 | ||||
-rw-r--r-- | kafka/protocol.py | 9 | ||||
-rw-r--r-- | kafka/util.py | 8 |
6 files changed, 156 insertions, 88 deletions
diff --git a/kafka/client.py b/kafka/client.py index 5dee7b7..d0e07d0 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,15 +1,16 @@ import copy import logging +import collections + +import kafka.common -from collections import defaultdict from functools import partial from itertools import count - -from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, +from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, - BrokerResponseError, PartitionUnavailableError, - LeaderUnavailableError, - KafkaUnavailableError) + PartitionUnavailableError, + LeaderUnavailableError, KafkaUnavailableError, + UnknownTopicOrPartitionError, NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -39,29 +40,23 @@ class KafkaClient(object): self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() # bootstrap with all metadata + ################## # Private API # ################## def _get_conn(self, host, port): "Get or create a connection to a broker using host and port" - host_key = (host, port) if host_key not in self.conns: - self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout) + self.conns[host_key] = KafkaConnection( + host, + port, + timeout=self.timeout + ) return self.conns[host_key] - def _get_conn_for_broker(self, broker): - """ - Get or create a connection to a broker - """ - if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, timeout=self.timeout) - - return self._get_conn(broker.host, broker.port) - def _get_leader_for_partition(self, topic, partition): """ Returns the leader for a partition or None if the partition exists @@ -99,10 +94,9 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception, e: + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " "trying next server: %s" % (request, host, port, e)) - continue raise KafkaUnavailableError("All servers failed to process request") @@ -130,7 +124,7 @@ class KafkaClient(object): # Group the requests by topic+partition original_keys = [] - payloads_by_broker = defaultdict(list) + payloads_by_broker = collections.defaultdict(list) for payload in payloads: leader = self._get_leader_for_partition(payload.topic, @@ -151,7 +145,7 @@ class KafkaClient(object): # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self._get_conn_for_broker(broker) + conn = self._get_conn(broker.host, broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) @@ -164,11 +158,11 @@ class KafkaClient(object): continue try: response = conn.recv(requestId) - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not receive response to request [%s] " "from server %s: %s", request, conn, e) failed = True - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not send request [%s] to server %s: %s", request, conn, e) failed = True @@ -191,16 +185,11 @@ class KafkaClient(object): return '<KafkaClient client_id=%s>' % (self.client_id) def _raise_on_response_error(self, resp): - if resp.error == ErrorMapping.NO_ERROR: - return - - if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, - ErrorMapping.NOT_LEADER_FOR_PARTITION): + try: + kafka.common.check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e: self.reset_topic_metadata(resp.topic) - - raise BrokerResponseError( - "Request for %s failed with errorcode=%d (%s)" % - (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) + raise ################# # Public API # diff --git a/kafka/common.py b/kafka/common.py index 005e6dd..d515532 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,29 +48,6 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) -ErrorStrings = { - -1 : 'UNKNOWN', - 0 : 'NO_ERROR', - 1 : 'OFFSET_OUT_OF_RANGE', - 2 : 'INVALID_MESSAGE', - 3 : 'UNKNOWN_TOPIC_OR_PARTITON', - 4 : 'INVALID_FETCH_SIZE', - 5 : 'LEADER_NOT_AVAILABLE', - 6 : 'NOT_LEADER_FOR_PARTITION', - 7 : 'REQUEST_TIMED_OUT', - 8 : 'BROKER_NOT_AVAILABLE', - 9 : 'REPLICA_NOT_AVAILABLE', - 10 : 'MESSAGE_SIZE_TOO_LARGE', - 11 : 'STALE_CONTROLLER_EPOCH', - 12 : 'OFFSET_METADATA_TOO_LARGE', -} - -class ErrorMapping(object): - pass - -for k, v in ErrorStrings.items(): - setattr(ErrorMapping, v, k) - ################# # Exceptions # ################# @@ -80,11 +57,81 @@ class KafkaError(RuntimeError): pass -class KafkaUnavailableError(KafkaError): +class BrokerResponseError(KafkaError): pass -class BrokerResponseError(KafkaError): +class UnknownError(BrokerResponseError): + errno = -1 + message = 'UNKNOWN' + + +class OffsetOutOfRangeError(BrokerResponseError): + errno = 1 + message = 'OFFSET_OUT_OF_RANGE' + + +class InvalidMessageError(BrokerResponseError): + errno = 2 + message = 'INVALID_MESSAGE' + + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = 3 + message = 'UNKNOWN_TOPIC_OR_PARTITON' + + +class InvalidFetchRequestError(BrokerResponseError): + errno = 4 + message = 'INVALID_FETCH_SIZE' + + +class LeaderNotAvailableError(BrokerResponseError): + errno = 5 + message = 'LEADER_NOT_AVAILABLE' + + +class NotLeaderForPartitionError(BrokerResponseError): + errno = 6 + message = 'NOT_LEADER_FOR_PARTITION' + + +class RequestTimedOutError(BrokerResponseError): + errno = 7 + message = 'REQUEST_TIMED_OUT' + + +class BrokerNotAvailableError(BrokerResponseError): + errno = 8 + message = 'BROKER_NOT_AVAILABLE' + + +class ReplicaNotAvailableError(BrokerResponseError): + errno = 9 + message = 'REPLICA_NOT_AVAILABLE' + + +class MessageSizeTooLargeError(BrokerResponseError): + errno = 10 + message = 'MESSAGE_SIZE_TOO_LARGE' + + +class StaleControllerEpochError(BrokerResponseError): + errno = 11 + message = 'STALE_CONTROLLER_EPOCH' + + +class OffsetMetadataTooLargeError(BrokerResponseError): + errno = 12 + message = 'OFFSET_METADATA_TOO_LARGE' + + +class StaleLeaderEpochCodeError(BrokerResponseError): + errno = 13 + message = 'STALE_LEADER_EPOCH_CODE' + + +class KafkaUnavailableError(KafkaError): pass @@ -118,3 +165,30 @@ class ConsumerFetchSizeTooSmall(KafkaError): class ConsumerNoMoreData(KafkaError): pass + + +class ProtocolError(KafkaError): + pass + +kafka_errors = { + -1 : UnknownError, + 1 : OffsetOutOfRangeError, + 2 : InvalidMessageError, + 3 : UnknownTopicOrPartitionError, + 4 : InvalidFetchRequestError, + 5 : LeaderNotAvailableError, + 6 : NotLeaderForPartitionError, + 7 : RequestTimedOutError, + 8 : BrokerNotAvailableError, + 9 : ReplicaNotAvailableError, + 10 : MessageSizeTooLargeError, + 11 : StaleControllerEpochError, + 12 : OffsetMetadataTooLargeError, + 13 : StaleLeaderEpochCodeError, +} + +def check_error(response): + error = kafka_errors.get(response.error) + if error: + raise error(response) + diff --git a/kafka/consumer.py b/kafka/consumer.py index 8ac28da..ef8fbda 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -3,13 +3,16 @@ from __future__ import absolute_import from itertools import izip_longest, repeat import logging import time +import numbers from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue +import kafka from kafka.common import ( - ErrorMapping, FetchRequest, + FetchRequest, OffsetRequest, OffsetCommitRequest, + OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -80,6 +83,8 @@ class Consumer(object): if not partitions: partitions = self.client.topic_partitions[topic] + else: + assert all(isinstance(x, numbers.Integral) for x in partitions) # Variables for handling offset commits self.commit_lock = Lock() @@ -96,26 +101,22 @@ class Consumer(object): self.commit_timer.start() def get_or_init_offset_callback(resp): - if resp.error == ErrorMapping.NO_ERROR: + try: + kafka.common.check_error(resp) return resp.offset - elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + except kafka.common.UnknownTopicOrPartitionError: return 0 - else: - raise Exception("OffsetFetchRequest for topic=%s, " - "partition=%d failed with errorcode=%s" % ( - resp.topic, resp.partition, resp.error)) - - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - for partition in partitions: - self.offsets[partition] = 0 + if auto_commit: + for partition in partitions: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset + else: + for partition in partitions: + self.offsets[partition] = 0 def commit(self, partitions=None): """ @@ -151,7 +152,7 @@ class Consumer(object): resps = self.client.send_offset_commit_request(self.group, reqs) for resp in resps: - assert resp.error == 0 + kafka.common.check_error(resp) self.count_since_commit = 0 @@ -164,7 +165,7 @@ class Consumer(object): if not self.auto_commit or self.auto_commit_every_n is None: return - if self.count_since_commit > self.auto_commit_every_n: + if self.count_since_commit >= self.auto_commit_every_n: self.commit() def stop(self): @@ -429,12 +430,12 @@ class SimpleConsumer(Consumer): # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall, e: + except ConsumerFetchSizeTooSmall: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): log.error("Max fetch size %d too small", self.max_buffer_size) - raise e + raise if self.max_buffer_size is None: self.buffer_size *= 2 else: @@ -443,7 +444,7 @@ class SimpleConsumer(Consumer): log.warn("Fetch size too small, increase to %d (2x) " "and retry", self.buffer_size) retry_partitions.add(partition) - except ConsumerNoMoreData, e: + except ConsumerNoMoreData as e: log.debug("Iteration was ended by %r", e) except StopIteration: # Stop iterating through this partition diff --git a/kafka/partitioner.py b/kafka/partitioner.py index 8190c34..5287cef 100644 --- a/kafka/partitioner.py +++ b/kafka/partitioner.py @@ -54,4 +54,5 @@ class HashedPartitioner(Partitioner): def partition(self, key, partitions): size = len(partitions) idx = hash(key) % size + return partitions[idx] diff --git a/kafka/protocol.py b/kafka/protocol.py index 25be023..7ec7946 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -8,7 +8,7 @@ from kafka.codec import ( from kafka.common import ( BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, ProduceResponse, FetchResponse, OffsetResponse, - OffsetCommitResponse, OffsetFetchResponse, + OffsetCommitResponse, OffsetFetchResponse, ProtocolError, BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall ) from kafka.util import ( @@ -50,7 +50,7 @@ class KafkaProtocol(object): request_key, # ApiKey 0, # ApiVersion correlation_id, # CorrelationId - len(client_id), + len(client_id), # ClientId size client_id) # ClientId @classmethod @@ -68,8 +68,7 @@ class KafkaProtocol(object): message_set = "" for message in messages: encoded_message = KafkaProtocol._encode_message(message) - message_set += struct.pack('>qi%ds' % len(encoded_message), 0, - len(encoded_message), encoded_message) + message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) return message_set @classmethod @@ -96,7 +95,7 @@ class KafkaProtocol(object): crc = zlib.crc32(msg) msg = struct.pack('>i%ds' % len(msg), crc, msg) else: - raise Exception("Unexpected magic number: %d" % message.magic) + raise ProtocolError("Unexpected magic number: %d" % message.magic) return msg @classmethod diff --git a/kafka/util.py b/kafka/util.py index 54052fb..a918234 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,5 +1,6 @@ -from collections import defaultdict +import collections import struct +import sys from threading import Thread, Event from kafka.common import BufferUnderflowError @@ -15,6 +16,9 @@ def write_int_string(s): def write_short_string(s): if s is None: return struct.pack('>h', -1) + elif len(s) > 32767 and sys.version < (2,7): + # Python 2.6 issues a deprecation warning instead of a struct error + raise struct.error(len(s)) else: return struct.pack('>h%ds' % len(s), len(s), s) @@ -63,7 +67,7 @@ def relative_unpack(fmt, data, cur): def group_by_topic_and_partition(tuples): - out = defaultdict(dict) + out = collections.defaultdict(dict) for t in tuples: out[t.topic][t.partition] = t return out |