diff options
-rw-r--r-- | kafka/client.py | 219 | ||||
-rw-r--r-- | kafka/common.py | 46 | ||||
-rw-r--r-- | kafka/consumer.py | 11 | ||||
-rw-r--r-- | kafka/producer.py | 19 | ||||
-rw-r--r-- | kafka/protocol.py | 45 | ||||
-rw-r--r-- | test/test_client.py | 297 | ||||
-rw-r--r-- | test/test_producer_integration.py | 6 | ||||
-rw-r--r-- | test/test_protocol.py | 73 |
8 files changed, 479 insertions, 237 deletions
diff --git a/kafka/client.py b/kafka/client.py index a918091..8c78694 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,11 +7,11 @@ import logging import time import kafka.common -from kafka.common import (TopicAndPartition, +from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, - PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError, - KafkaTimeoutError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError) + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -37,8 +37,9 @@ class KafkaClient(object): # create connections only when we need them self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata + self.topic_partitions = {} # topic -> partition -> PartitionMetadata + self.load_metadata_for_topics() # bootstrap with all metadata @@ -63,20 +64,37 @@ class KafkaClient(object): Returns the leader for a partition or None if the partition exists but has no leader. - PartitionUnavailableError will be raised if the topic or partition + UnknownTopicOrPartitionError will be raised if the topic or partition is not part of the metadata. + + LeaderNotAvailableError is raised if server has metadata, but there is + no current leader """ key = TopicAndPartition(topic, partition) - # reload metadata whether the partition is not available - # or has no leader (broker is None) - if self.topics_to_brokers.get(key) is None: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise PartitionUnavailableError("%s not available" % str(key)) + # Use cached metadata if it is there + if self.topics_to_brokers.get(key) is not None: + return self.topics_to_brokers[key] + + # Otherwise refresh metadata + + # If topic does not already exist, this will raise + # UnknownTopicOrPartitionError if not auto-creating + # LeaderNotAvailableError otherwise until partitions are created + self.load_metadata_for_topics(topic) - return self.topics_to_brokers[key] + # If the partition doesn't actually exist, raise + if partition not in self.topic_partitions[topic]: + raise UnknownTopicOrPartitionError(key) + + # If there's no leader for the partition, raise + meta = self.topic_partitions[topic][partition] + if meta.leader == -1: + raise LeaderNotAvailableError(meta) + + # Otherwise return the BrokerMetadata + return self.brokers[meta.leader] def _next_id(self): """ @@ -84,20 +102,26 @@ class KafkaClient(object): """ return next(KafkaClient.ID_GEN) - def _send_broker_unaware_request(self, requestId, request): + def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): """ Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: + requestId = self._next_id() try: conn = self._get_conn(host, port) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, + payloads=payloads) + conn.send(requestId, request) response = conn.recv(requestId) - return response + return decoder_fn(response) + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (binascii.b2a_hex(request), host, port, e)) + "trying next server: %s" % (requestId, host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -109,8 +133,8 @@ class KafkaClient(object): Params ====== - payloads: list of object-like entities with a topic and - partition attribute + payloads: list of object-like entities with a topic (str) and + partition (int) attribute encode_fn: a method to encode the list of payloads to a request body, must accept client_id, correlation_id, and payloads as keyword arguments @@ -130,10 +154,6 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - if leader is None: - raise LeaderUnavailableError( - "Leader not available for topic %s partition %s" % - (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -195,6 +215,24 @@ class KafkaClient(object): ################# # Public API # ################# + def close(self): + for conn in self.conns.values(): + conn.close() + + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for key in c.conns: + c.conns[key] = self.conns[key].copy() + return c + + def reinit(self): + for conn in self.conns.values(): + conn.reinit() + def reset_topic_metadata(self, *topics): for topic in topics: try: @@ -212,70 +250,125 @@ class KafkaClient(object): self.topic_partitions.clear() def has_metadata_for_topic(self, topic): - return topic in self.topic_partitions + return ( + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 + ) + + def get_partition_ids_for_topic(self, topic): + if topic not in self.topic_partitions: + return None + + return list(self.topic_partitions[topic]) def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() - self.load_metadata_for_topics(topic) while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) - self.load_metadata_for_topics(topic) + try: + self.load_metadata_for_topics(topic) + except LeaderNotAvailableError: + pass + except UnknownTopicOrPartitionError: + # Server is not configured to auto-create + # retrying in this case will not help + raise time.sleep(.5) - def close(self): - for conn in self.conns.values(): - conn.close() - - def copy(self): - """ - Create an inactive copy of the client object - A reinit() has to be done on the copy before it can be used again + def load_metadata_for_topics(self, *topics): """ - c = copy.deepcopy(self) - for key in c.conns: - c.conns[key] = self.conns[key].copy() - return c + Fetch broker and topic-partition metadata from the server, + and update internal data: + broker list, topic/partition list, and topic/parition -> broker map - def reinit(self): - for conn in self.conns.values(): - conn.reinit() + This method should be called after receiving any error - def load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This function is called - lazily whenever metadata is unavailable. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) + @param: *topics (optional) + If a list of topics is provided, the metadata refresh will be limited + to the specified topics only. + + Exceptions: + ---------- + If the broker is configured to not auto-create topics, + expect UnknownTopicOrPartitionError for topics that don't exist - response = self._send_broker_unaware_request(request_id, request) + If the broker is configured to auto-create topics, + expect LeaderNotAvailableError for new topics + until partitions have been initialized. - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + Exceptions *will not* be raised in a full refresh (i.e. no topic list) + In this case, error codes will be logged as errors - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) + Partition-level errors will also not be raised here + (a single partition w/o a leader, for example) + """ + resp = self.send_metadata_request(topics) + + log.debug("Broker metadata: %s", resp.brokers) + log.debug("Topic metadata: %s", resp.topics) + + self.brokers = dict([(broker.nodeId, broker) + for broker in resp.brokers]) - self.brokers = brokers + for topic_metadata in resp.topics: + topic = topic_metadata.topic + partitions = topic_metadata.partitions - for topic, partitions in topics.items(): self.reset_topic_metadata(topic) - if not partitions: - log.warning('No partitions for %s', topic) + # Errors expected for new topics + try: + kafka.common.check_error(topic_metadata) + except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: + + # Raise if the topic was passed in explicitly + if topic in topics: + raise + + # Otherwise, just log a warning + log.error("Error loading topic metadata for %s: %s", topic, type(e)) continue - self.topic_partitions[topic] = [] - for partition, meta in partitions.items(): - self.topic_partitions[topic].append(partition) + self.topic_partitions[topic] = {} + for partition_metadata in partitions: + partition = partition_metadata.partition + leader = partition_metadata.leader + + self.topic_partitions[topic][partition] = partition_metadata + + # Populate topics_to_brokers dict topic_part = TopicAndPartition(topic, partition) - if meta.leader == -1: - log.warning('No leader for topic %s partition %s', topic, partition) + + # Check for partition errors + try: + kafka.common.check_error(partition_metadata) + + # If No Leader, topics_to_brokers topic_partition -> None + except LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) self.topics_to_brokers[topic_part] = None + continue + + # If Known Broker, topic_partition -> BrokerMetadata + if leader in self.brokers: + self.topics_to_brokers[topic_part] = self.brokers[leader] + + # If Unknown Broker, fake BrokerMetadata so we dont lose the id + # (not sure how this could happen. server could be in bad state) else: - self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topics_to_brokers[topic_part] = BrokerMetadata( + leader, None, None + ) + + def send_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + + encoder = KafkaProtocol.encode_metadata_request + decoder = KafkaProtocol.decode_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): diff --git a/kafka/common.py b/kafka/common.py index 907e128..008736c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest", OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) -BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", - "replicas", "isr"]) # Other useful structs -OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) -Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +TopicMetadata = namedtuple("TopicMetadata", + ["topic", "error", "partitions"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +TopicAndPartition = namedtuple("TopicAndPartition", + ["topic", "partition"]) ################# @@ -60,6 +76,9 @@ class KafkaError(RuntimeError): class BrokerResponseError(KafkaError): pass +class NoError(BrokerResponseError): + errno = 0 + message = 'SUCCESS' class UnknownError(BrokerResponseError): errno = -1 @@ -139,14 +158,6 @@ class KafkaTimeoutError(KafkaError): pass -class LeaderUnavailableError(KafkaError): - pass - - -class PartitionUnavailableError(KafkaError): - pass - - class FailedPayloadsError(KafkaError): pass @@ -181,6 +192,7 @@ class UnsupportedCodecError(KafkaError): kafka_errors = { -1 : UnknownError, + 0 : NoError, 1 : OffsetOutOfRangeError, 2 : InvalidMessageError, 3 : UnknownTopicOrPartitionError, @@ -198,7 +210,7 @@ kafka_errors = { def check_error(response): - error = kafka_errors.get(response.error) - if error: + error = kafka_errors.get(response.error, UnknownError) + if error is not NoError: raise error(response) diff --git a/kafka/consumer.py b/kafka/consumer.py index fa1b8bc..42628e1 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -91,7 +91,7 @@ class Consumer(object): self.offsets = {} if not partitions: - partitions = self.client.topic_partitions[topic] + partitions = self.client.get_partition_ids_for_topic(topic) else: assert all(isinstance(x, numbers.Integral) for x in partitions) @@ -117,9 +117,9 @@ class Consumer(object): def fetch_last_known_offsets(self, partitions=None): if not partitions: - partitions = self.client.topic_partitions[self.topic] + partitions = self.client.get_partition_ids_for_topic(self.topic) - def get_or_init_offset_callback(resp): + def get_or_init_offset(resp): try: kafka.common.check_error(resp) return resp.offset @@ -128,10 +128,9 @@ class Consumer(object): for partition in partitions: req = OffsetFetchRequest(self.topic, partition) - (offset,) = self.client.send_offset_fetch_request(self.group, [req], - callback=get_or_init_offset_callback, + (resp,) = self.client.send_offset_fetch_request(self.group, [req], fail_on_error=False) - self.offsets[partition] = offset + self.offsets[partition] = get_or_init_offset(resp) self.fetch_offsets = self.offsets.copy() def commit(self, partitions=None): diff --git a/kafka/producer.py b/kafka/producer.py index 4a04b38..f186649 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -247,16 +247,14 @@ class SimpleProducer(Producer): def _next_partition(self, topic): if topic not in self.partition_cycles: - if topic not in self.client.topic_partitions: + if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - try: - self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) - except KeyError: - raise UnknownTopicOrPartitionError(topic) + + self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) # Randomize the initial partition that is returned if self.random_start: - num_partitions = len(self.client.topic_partitions[topic]) + num_partitions = len(self.client.get_partition_ids_for_topic(topic)) for _ in xrange(random.randint(0, num_partitions-1)): next(self.partition_cycles[topic]) @@ -305,12 +303,13 @@ class KeyedProducer(Producer): def _next_partition(self, topic, key): if topic not in self.partitioners: - if topic not in self.client.topic_partitions: + if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - self.partitioners[topic] = \ - self.partitioner_class(self.client.topic_partitions[topic]) + + self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) + partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.topic_partitions[topic]) + return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) def send(self, topic, key, msg): partition = self._next_partition(topic, key) diff --git a/kafka/protocol.py b/kafka/protocol.py index e5356c5..9e01f5a 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -9,11 +9,12 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, - ProduceResponse, FetchResponse, OffsetResponse, - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, - BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall, - UnsupportedCodecError + Message, OffsetAndMessage, TopicAndPartition, + BrokerMetadata, TopicMetadata, PartitionMetadata, + MetadataResponse, ProduceResponse, FetchResponse, + OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, + ProtocolError, BufferUnderflowError, ChecksumError, + ConsumerFetchSizeTooSmall, UnsupportedCodecError ) from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, @@ -343,7 +344,8 @@ class KafkaProtocol(object): yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod - def encode_metadata_request(cls, client_id, correlation_id, topics=None): + def encode_metadata_request(cls, client_id, correlation_id, topics=None, + payloads=None): """ Encode a MetadataRequest @@ -353,7 +355,11 @@ class KafkaProtocol(object): correlation_id: int topics: list of strings """ - topics = [] if topics is None else topics + if payloads is None: + topics = [] if topics is None else topics + else: + topics = payloads + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) @@ -376,28 +382,24 @@ class KafkaProtocol(object): ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) # Broker info - brokers = {} + brokers = [] for i in range(numbrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) - brokers[nodeId] = BrokerMetadata(nodeId, host, port) + brokers.append(BrokerMetadata(nodeId, host, port)) # Topic info ((num_topics,), cur) = relative_unpack('>i', data, cur) - topic_metadata = {} + topic_metadata = [] for i in range(num_topics): - # NOTE: topic_error is discarded. Should probably be returned with - # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - partition_metadata = {} + partition_metadata = [] for j in range(num_partitions): - # NOTE: partition_error_code is discarded. Should probably be - # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -407,13 +409,16 @@ class KafkaProtocol(object): ((num_isr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) - partition_metadata[partition] = \ - PartitionMetadata( - topic_name, partition, leader, replicas, isr) + partition_metadata.append( + PartitionMetadata(topic_name, partition, leader, + replicas, isr, partition_error_code) + ) - topic_metadata[topic_name] = partition_metadata + topic_metadata.append( + TopicMetadata(topic_name, topic_error, partition_metadata) + ) - return brokers, topic_metadata + return MetadataResponse(brokers, topic_metadata) @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, diff --git a/test/test_client.py b/test/test_client.py index 274655e..ba6e3b1 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,15 +1,17 @@ import socket from time import sleep -from mock import MagicMock, patch +from mock import ANY, MagicMock, patch import six from . import unittest from kafka import KafkaClient from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, + ProduceRequest, MetadataResponse, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - LeaderUnavailableError, PartitionUnavailableError, + LeaderNotAvailableError, NoError, + UnknownTopicOrPartitionError, KafkaTimeoutError, ConnectionError ) from kafka.conn import KafkaConnection @@ -17,6 +19,10 @@ from kafka.protocol import KafkaProtocol, create_message from test.testutil import Timer +NO_ERROR = 0 +UNKNOWN_TOPIC_OR_PARTITION = 3 +NO_LEADER = 5 + class TestKafkaClient(unittest.TestCase): def test_init_with_list(self): with patch.object(KafkaClient, 'load_metadata_for_topics'): @@ -64,10 +70,12 @@ class TestKafkaClient(unittest.TestCase): req = KafkaProtocol.encode_metadata_request(b'client', 0) with self.assertRaises(KafkaUnavailableError): - client._send_broker_unaware_request(1, req) + client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(return_value='fake encoded message'), + decoder_fn=lambda x: x) for key, conn in six.iteritems(mocked_conns): - conn.send.assert_called_with(1, req) + conn.send.assert_called_with(ANY, 'fake encoded message') def test_send_broker_unaware_request(self): 'Tests that call works when at least one of the host is available' @@ -88,40 +96,46 @@ class TestKafkaClient(unittest.TestCase): # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + with patch.object(KafkaClient, '_next_id', return_value=1): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') - req = KafkaProtocol.encode_metadata_request(b'client', 0) - resp = client._send_broker_unaware_request(1, req) + resp = client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(), + decoder_fn=lambda x: x) - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_load_metadata(self, protocol, conn): - "Load metadata for all topics" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - topics['topic_no_partitions'] = {} - topics['topic_3'] = { - 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), - 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_1', NO_ERROR, [ + PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR) + ]), + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), + ]), + TopicMetadata('topic_no_partitions', NO_LEADER, []), + TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_3', NO_ERROR, [ + PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR), + PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # client loads metadata at init client = KafkaClient(hosts=['broker_1:4567']) @@ -134,6 +148,78 @@ class TestKafkaClient(unittest.TestCase): TopicAndPartition('topic_3', 2): brokers[0]}, client.topics_to_brokers) + # if we ask for metadata explicitly, it should raise errors + with self.assertRaises(LeaderNotAvailableError): + client.load_metadata_for_topics('topic_no_partitions') + + with self.assertRaises(UnknownTopicOrPartitionError): + client.load_metadata_for_topics('topic_unknown') + + # This should not raise + client.load_metadata_for_topics('topic_no_leader') + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_has_metadata_for_topic(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + # Topics with no partitions return False + self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) + self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist')) + + # Topic with partition metadata, but no leaders return True + self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_ensure_topic_exists(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + with self.assertRaises(UnknownTopicOrPartitionError): + client.ensure_topic_exists('topic_doesnt_exist', timeout=1) + + with self.assertRaises(KafkaTimeoutError): + client.ensure_topic_exists('topic_still_creating', timeout=1) + + # This should not raise + client.ensure_topic_exists('topic_noleaders', timeout=1) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): @@ -141,70 +227,84 @@ class TestKafkaClient(unittest.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', NO_LEADER, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) - topics['topic_no_partitions'] = { - 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_one_partition', NO_ERROR, [ + PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic - leader = client._get_leader_for_partition('topic_no_partitions', 0) + leader = client._get_leader_for_partition('topic_one_partition', 0) self.assertEqual(brokers[0], leader) self.assertDictEqual({ - TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + TopicAndPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_unassigned_partitions(self, protocol, conn): - "Get leader raises if no partitions is defined for a topic" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', NO_LEADER, []), + TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({}, client.topics_to_brokers) - with self.assertRaises(PartitionUnavailableError): + with self.assertRaises(LeaderNotAvailableError): client._get_leader_for_partition('topic_no_partitions', 0) + with self.assertRaises(UnknownTopicOrPartitionError): + client._get_leader_for_partition('topic_unknown', 0) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') - def test_get_leader_returns_none_when_noleader(self, protocol, conn): - "Getting leader for partitions returns None when the partiion has no leader" + def test_get_leader_exceptions_when_noleader(self, protocol, conn): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -213,34 +313,48 @@ class TestKafkaClient(unittest.TestCase): TopicAndPartition('topic_noleader', 1): None }, client.topics_to_brokers) - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + # No leader partitions -- raise LeaderNotAvailableError + with self.assertRaises(LeaderNotAvailableError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) + with self.assertRaises(LeaderNotAvailableError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) + + # Unknown partitions -- raise UnknownTopicOrPartitionError + with self.assertRaises(UnknownTopicOrPartitionError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2)) + + topics = [ + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Send producer request raises LeaderUnavailableError if leader is not available" + "Send producer request raises LeaderNotAvailableError if leader is not available" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) @@ -248,7 +362,32 @@ class TestKafkaClient(unittest.TestCase): "topic_noleader", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(LeaderUnavailableError): + with self.assertRaises(LeaderNotAvailableError): + client.send_produce_request(requests) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + requests = [ProduceRequest( + "topic_doesnt_exist", 0, + [create_message("a"), create_message("b")])] + + with self.assertRaises(UnknownTopicOrPartitionError): client.send_produce_request(requests) def test_timeout(self): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 125df2c..71fd26c 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -9,7 +9,8 @@ from kafka import ( ) from kafka.codec import has_snappy from kafka.common import ( - FetchRequest, ProduceRequest, UnknownTopicOrPartitionError + FetchRequest, ProduceRequest, + UnknownTopicOrPartitionError, LeaderNotAvailableError ) from test.fixtures import ZookeeperFixture, KafkaFixture @@ -165,7 +166,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = SimpleProducer(self.client) # At first it doesn't exist - with self.assertRaises(UnknownTopicOrPartitionError): + with self.assertRaises((UnknownTopicOrPartitionError, + LeaderNotAvailableError)): producer.send_messages(new_topic, self.msg("one")) @kafka_versions("all") diff --git a/test/test_protocol.py b/test/test_protocol.py index 11d4687..a028be9 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -10,9 +10,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, ProtocolError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, + ProtocolError ) from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, @@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in six.iteritems(broker_data): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in six.iteritems(topic_data): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in six.iteritems(partitions): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - b"topic1": { - 0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1)) - }, - b"topic2": { - 0: PartitionMetadata(b"topic2", 0, 0, (), ()) - } - } - topic_errors = {b"topic1": 0, b"topic2": 1} - partition_errors = { - (b"topic1", 0): 0, - (b"topic1", 1): 1, - (b"topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata(b"topic1", 0, [ + PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata(b"topic2", 1, [ + PartitionMetadata(b"topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |