diff options
author | Thomas Dimson <tdimson@gmail.com> | 2014-01-13 15:30:51 -0800 |
---|---|---|
committer | Thomas Dimson <tdimson@gmail.com> | 2014-01-13 15:38:46 -0800 |
commit | 9c7b41283851735cbee5092f2923d7c8a006b89a (patch) | |
tree | 2ecf49bc2a9251378b76bc64ae1f26e4fa4c42a3 | |
parent | 87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (diff) | |
download | kafka-python-9c7b41283851735cbee5092f2923d7c8a006b89a.tar.gz |
Exception hierarchy, invalidate more md on errors
-rw-r--r-- | kafka/client.py | 168 | ||||
-rw-r--r-- | kafka/common.py | 31 | ||||
-rw-r--r-- | kafka/consumer.py | 2 | ||||
-rw-r--r-- | kafka/producer.py | 15 | ||||
-rw-r--r-- | test/test_integration.py | 72 |
5 files changed, 166 insertions, 122 deletions
diff --git a/kafka/client.py b/kafka/client.py index 33c6d77..7e169e8 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,14 +1,15 @@ import copy +import logging + from collections import defaultdict from functools import partial from itertools import count -import logging -import time -from kafka.common import ( - ErrorMapping, TopicAndPartition, ConnectionError, - FailedPayloadsException -) +from kafka.common import (ErrorMapping, TopicAndPartition, + ConnectionError, FailedPayloadsError, + BrokerResponseError, PartitionUnavailableError, + KafkaRequestError) + from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -29,8 +30,8 @@ class KafkaClient(object): } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self._load_metadata_for_topics() + self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.load_metadata_for_topics() # bootstrap with all metadata ################## # Private API # @@ -49,55 +50,13 @@ class KafkaClient(object): def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: - self._load_metadata_for_topics(topic) + self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) + raise KafkaRequestError("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] - def _load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This method will - recurse in the event of a retry. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - - response = self._send_broker_unaware_request(request_id, request) - if response is None: - raise Exception("All servers failed to process request") - - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) - - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) - - self.brokers = brokers - self.topics_to_brokers = {} - - for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) - - if not partitions: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - break - - for partition, meta in partitions.items(): - if meta.leader == -1: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - else: - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) - def _next_id(self): """ Generate a new correlation id @@ -119,7 +78,7 @@ class KafkaClient(object): "trying next server: %s" % (request, conn, e)) continue - return None + raise BrokerResponseError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -150,6 +109,8 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) + if leader == -1: + raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -185,21 +146,51 @@ class KafkaClient(object): if failed: failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.reset_all_metadata() continue for response in decoder_fn(response): acc[(response.topic, response.partition)] = response if failed_payloads: - raise FailedPayloadsException(failed_payloads) + raise FailedPayloadsError(failed_payloads) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () + def _raise_on_response_error(self, resp): + if resp.error == ErrorMapping.NO_ERROR: + return + + if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, + ErrorMapping.NOT_LEADER_FOR_PARTITION): + self.reset_topic_metadata(resp.topic) + + raise BrokerResponseError( + "Request for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + ################# # Public API # ################# + def reset_topic_metadata(self, *topics): + for topic in topics: + try: + partitions = self.topic_partitions[topic] + except KeyError: + continue + + for partition in partitions: + self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) + + del self.topic_partitions[topic] + + def reset_all_metadata(self): + self.topics_to_brokers.clear() + self.topic_partitions.clear() + + def has_metadata_for_topic(self, topic): + return topic in self.topic_partitions def close(self): for conn in self.conns.values(): @@ -219,6 +210,36 @@ class KafkaClient(object): for conn in self.conns.values(): conn.reinit() + def load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics. This function is called + lazily whenever metadata is unavailable. + """ + request_id = self._next_id() + request = KafkaProtocol.encode_metadata_request(self.client_id, + request_id, topics) + + response = self._send_broker_unaware_request(request_id, request) + + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + + self.brokers = brokers + + for topic, partitions in topics.items(): + self.reset_topic_metadata(topic) + + if not partitions: + continue + + self.topic_partitions[topic] = [] + for partition, meta in partitions.items(): + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -256,14 +277,9 @@ class KafkaClient(object): out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( - "ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -289,14 +305,9 @@ class KafkaClient(object): out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( - "FetchRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -312,9 +323,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: @@ -330,9 +340,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with " - "errorcode=%s", resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) @@ -350,9 +359,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/common.py b/kafka/common.py index 6f0dd32..5bd9a96 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,23 +69,42 @@ class ErrorMapping(object): # Exceptions # ################# -class FailedPayloadsException(Exception): + +class KafkaError(RuntimeError): + pass + + +class KafkaRequestError(KafkaError): pass -class ConnectionError(Exception): + +class BrokerResponseError(KafkaError): + pass + + +class PartitionUnavailableError(KafkaError): pass -class BufferUnderflowError(Exception): + +class FailedPayloadsError(KafkaError): + pass + + +class ConnectionError(KafkaError): + pass + + +class BufferUnderflowError(KafkaError): pass -class ChecksumError(Exception): +class ChecksumError(KafkaError): pass -class ConsumerFetchSizeTooSmall(Exception): +class ConsumerFetchSizeTooSmall(KafkaError): pass -class ConsumerNoMoreData(Exception): +class ConsumerNoMoreData(KafkaError): pass diff --git a/kafka/consumer.py b/kafka/consumer.py index eba2912..522d6ca 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -76,7 +76,7 @@ class Consumer(object): self.client = client self.topic = topic self.group = group - self.client._load_metadata_for_topics(topic) + self.client.load_metadata_for_topics(topic) self.offsets = {} if not partitions: diff --git a/kafka/producer.py b/kafka/producer.py index 5aead43..6ed22ee 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,17 +1,16 @@ from __future__ import absolute_import +import logging +import time + +from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from Queue import Empty -import logging -import sys -import time from kafka.common import ProduceRequest -from kafka.common import FailedPayloadsException -from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner +from kafka.protocol import create_message log = logging.getLogger("kafka") @@ -188,7 +187,7 @@ class SimpleProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) self.next_partition = cycle(client.topic_partitions[topic]) super(SimpleProducer, self).__init__(client, async, req_acks, @@ -225,7 +224,7 @@ class KeyedProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) if not partitioner: partitioner = HashedPartitioner diff --git a/test/test_integration.py b/test/test_integration.py index eaf432d..56974a5 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,7 +12,23 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture -class TestKafkaClient(unittest.TestCase): +class KafkaTestCase(unittest.TestCase): + def setUp(self): + topic_name = self.id()[self.id().rindex(".")+1:] + times = 0 + while True: + times += 1 + self.client.load_metadata_for_topics(topic_name) + if self.client.has_metadata_for_topic(topic_name): + break + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create topic %s" % topic_name) + + +class TestKafkaClient(KafkaTestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -30,6 +46,7 @@ class TestKafkaClient(unittest.TestCase): ##################### def test_produce_many_simple(self): + produce = ProduceRequest("test_produce_many_simple", 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -331,15 +348,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, "test_hash_partitioner", + producer = KeyedProducer(self.client, "test_hashed_partitioner", partitioner=HashedPartitioner) producer.send(1, "one") producer.send(2, "two") producer.send(3, "three") producer.send(4, "four") - fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + fetch1 = FetchRequest("test_hashed_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_hashed_partitioner", 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -549,7 +566,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() -class TestConsumer(unittest.TestCase): +class TestConsumer(KafkaTestCase): @classmethod def setUpClass(cls): cls.zk = ZookeeperFixture.instance() @@ -648,21 +665,21 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + produce1 = ProduceRequest("test_simple_consumer_pending", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest("test_simple_consumer_pending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_pending", auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) @@ -671,7 +688,7 @@ class TestConsumer(unittest.TestCase): def test_multi_process_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest("test_multi_process_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -680,7 +697,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest("test_multi_process_consumer", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -689,7 +706,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "grp1", "test_multi_process_consumer", auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -702,11 +719,11 @@ class TestConsumer(unittest.TestCase): start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) diff = (datetime.now() - start).total_seconds() - self.assertGreaterEqual(diff, 5) + self.assertGreaterEqual(diff, 4.9) self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest("test_multi_process_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -729,7 +746,7 @@ class TestConsumer(unittest.TestCase): def test_multi_proc_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest("test_multi_proc_pending", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -737,7 +754,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest("test_multi_proc_pending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -745,7 +762,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", "test_multi_proc_pending", auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -800,19 +817,20 @@ class TestConsumer(unittest.TestCase): self.assertIsNotNone(message) self.assertEquals(message.message.value, big_message.value) -class TestFailover(unittest.TestCase): - def setUp(self): +class TestFailover(KafkaTestCase): - zk_chroot = random_string(10) - replicas = 2 + def setUp(self): + zk_chroot = random_string(10) + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + KafkaTestCase.setUp(self) def tearDown(self): self.client.close() @@ -835,7 +853,7 @@ class TestFailover(unittest.TestCase): broker = self._kill_leader(topic, partition) # expect failure, reload meta data - with self.assertRaises(FailedPayloadsException): + with self.assertRaises(FailedPayloadsError): producer.send_messages('part 1') producer.send_messages('part 2') time.sleep(1) @@ -886,17 +904,17 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(self.brokers[0].host, self.brokers[0].port) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: |