summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Dimson <tdimson@gmail.com>2014-01-13 15:30:51 -0800
committerThomas Dimson <tdimson@gmail.com>2014-01-13 15:38:46 -0800
commit9c7b41283851735cbee5092f2923d7c8a006b89a (patch)
tree2ecf49bc2a9251378b76bc64ae1f26e4fa4c42a3
parent87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (diff)
downloadkafka-python-9c7b41283851735cbee5092f2923d7c8a006b89a.tar.gz
Exception hierarchy, invalidate more md on errors
-rw-r--r--kafka/client.py168
-rw-r--r--kafka/common.py31
-rw-r--r--kafka/consumer.py2
-rw-r--r--kafka/producer.py15
-rw-r--r--test/test_integration.py72
5 files changed, 166 insertions, 122 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 33c6d77..7e169e8 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,14 +1,15 @@
import copy
+import logging
+
from collections import defaultdict
from functools import partial
from itertools import count
-import logging
-import time
-from kafka.common import (
- ErrorMapping, TopicAndPartition, ConnectionError,
- FailedPayloadsException
-)
+from kafka.common import (ErrorMapping, TopicAndPartition,
+ ConnectionError, FailedPayloadsError,
+ BrokerResponseError, PartitionUnavailableError,
+ KafkaRequestError)
+
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -29,8 +30,8 @@ class KafkaClient(object):
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
- self._load_metadata_for_topics()
+ self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
@@ -49,55 +50,13 @@ class KafkaClient(object):
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
- self._load_metadata_for_topics(topic)
+ self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
- raise Exception("Partition does not exist: %s" % str(key))
+ raise KafkaRequestError("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
- def _load_metadata_for_topics(self, *topics):
- """
- Discover brokers and metadata for a set of topics. This method will
- recurse in the event of a retry.
- """
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
-
- response = self._send_broker_unaware_request(request_id, request)
- if response is None:
- raise Exception("All servers failed to process request")
-
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
-
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
-
- self.brokers = brokers
- self.topics_to_brokers = {}
-
- for topic, partitions in topics.items():
- # Clear the list once before we add it. This removes stale entries
- # and avoids duplicates
- self.topic_partitions.pop(topic, None)
-
- if not partitions:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- break
-
- for partition, meta in partitions.items():
- if meta.leader == -1:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- else:
- topic_part = TopicAndPartition(topic, partition)
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
- self.topic_partitions[topic].append(partition)
-
def _next_id(self):
"""
Generate a new correlation id
@@ -119,7 +78,7 @@ class KafkaClient(object):
"trying next server: %s" % (request, conn, e))
continue
- return None
+ raise BrokerResponseError("All servers failed to process request")
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -150,6 +109,8 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
+ if leader == -1:
+ raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -185,21 +146,51 @@ class KafkaClient(object):
if failed:
failed_payloads += payloads
- self.topics_to_brokers = {} # reset metadata
+ self.reset_all_metadata()
continue
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
if failed_payloads:
- raise FailedPayloadsException(failed_payloads)
+ raise FailedPayloadsError(failed_payloads)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
+ def _raise_on_response_error(self, resp):
+ if resp.error == ErrorMapping.NO_ERROR:
+ return
+
+ if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
+ ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ self.reset_topic_metadata(resp.topic)
+
+ raise BrokerResponseError(
+ "Request for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error))
+
#################
# Public API #
#################
+ def reset_topic_metadata(self, *topics):
+ for topic in topics:
+ try:
+ partitions = self.topic_partitions[topic]
+ except KeyError:
+ continue
+
+ for partition in partitions:
+ self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
+
+ del self.topic_partitions[topic]
+
+ def reset_all_metadata(self):
+ self.topics_to_brokers.clear()
+ self.topic_partitions.clear()
+
+ def has_metadata_for_topic(self, topic):
+ return topic in self.topic_partitions
def close(self):
for conn in self.conns.values():
@@ -219,6 +210,36 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.reinit()
+ def load_metadata_for_topics(self, *topics):
+ """
+ Discover brokers and metadata for a set of topics. This function is called
+ lazily whenever metadata is unavailable.
+ """
+ request_id = self._next_id()
+ request = KafkaProtocol.encode_metadata_request(self.client_id,
+ request_id, topics)
+
+ response = self._send_broker_unaware_request(request_id, request)
+
+ (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+
+ log.debug("Broker metadata: %s", brokers)
+ log.debug("Topic metadata: %s", topics)
+
+ self.brokers = brokers
+
+ for topic, partitions in topics.items():
+ self.reset_topic_metadata(topic)
+
+ if not partitions:
+ continue
+
+ self.topic_partitions[topic] = []
+ for partition, meta in partitions.items():
+ topic_part = TopicAndPartition(topic, partition)
+ self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topic_partitions[topic].append(partition)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -256,14 +277,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "ProduceRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -289,14 +305,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "FetchRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -312,9 +323,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
@@ -330,9 +340,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with "
- "errorcode=%s", resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
@@ -350,9 +359,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
diff --git a/kafka/common.py b/kafka/common.py
index 6f0dd32..5bd9a96 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -69,23 +69,42 @@ class ErrorMapping(object):
# Exceptions #
#################
-class FailedPayloadsException(Exception):
+
+class KafkaError(RuntimeError):
+ pass
+
+
+class KafkaRequestError(KafkaError):
pass
-class ConnectionError(Exception):
+
+class BrokerResponseError(KafkaError):
+ pass
+
+
+class PartitionUnavailableError(KafkaError):
pass
-class BufferUnderflowError(Exception):
+
+class FailedPayloadsError(KafkaError):
+ pass
+
+
+class ConnectionError(KafkaError):
+ pass
+
+
+class BufferUnderflowError(KafkaError):
pass
-class ChecksumError(Exception):
+class ChecksumError(KafkaError):
pass
-class ConsumerFetchSizeTooSmall(Exception):
+class ConsumerFetchSizeTooSmall(KafkaError):
pass
-class ConsumerNoMoreData(Exception):
+class ConsumerNoMoreData(KafkaError):
pass
diff --git a/kafka/consumer.py b/kafka/consumer.py
index eba2912..522d6ca 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -76,7 +76,7 @@ class Consumer(object):
self.client = client
self.topic = topic
self.group = group
- self.client._load_metadata_for_topics(topic)
+ self.client.load_metadata_for_topics(topic)
self.offsets = {}
if not partitions:
diff --git a/kafka/producer.py b/kafka/producer.py
index 5aead43..6ed22ee 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,17 +1,16 @@
from __future__ import absolute_import
+import logging
+import time
+
+from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from Queue import Empty
-import logging
-import sys
-import time
from kafka.common import ProduceRequest
-from kafka.common import FailedPayloadsException
-from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
+from kafka.protocol import create_message
log = logging.getLogger("kafka")
@@ -188,7 +187,7 @@ class SimpleProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- client._load_metadata_for_topics(topic)
+ client.load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])
super(SimpleProducer, self).__init__(client, async, req_acks,
@@ -225,7 +224,7 @@ class KeyedProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- client._load_metadata_for_topics(topic)
+ client.load_metadata_for_topics(topic)
if not partitioner:
partitioner = HashedPartitioner
diff --git a/test/test_integration.py b/test/test_integration.py
index eaf432d..56974a5 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -12,7 +12,23 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from .fixtures import ZookeeperFixture, KafkaFixture
-class TestKafkaClient(unittest.TestCase):
+class KafkaTestCase(unittest.TestCase):
+ def setUp(self):
+ topic_name = self.id()[self.id().rindex(".")+1:]
+ times = 0
+ while True:
+ times += 1
+ self.client.load_metadata_for_topics(topic_name)
+ if self.client.has_metadata_for_topic(topic_name):
+ break
+ print "Waiting for %s topic to be created" % topic_name
+ time.sleep(1)
+
+ if times > 30:
+ raise Exception("Unable to create topic %s" % topic_name)
+
+
+class TestKafkaClient(KafkaTestCase):
@classmethod
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
@@ -30,6 +46,7 @@ class TestKafkaClient(unittest.TestCase):
#####################
def test_produce_many_simple(self):
+
produce = ProduceRequest("test_produce_many_simple", 0, messages=[
create_message("Test message %d" % i) for i in range(100)
])
@@ -331,15 +348,15 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_hashed_partitioner(self):
- producer = KeyedProducer(self.client, "test_hash_partitioner",
+ producer = KeyedProducer(self.client, "test_hashed_partitioner",
partitioner=HashedPartitioner)
producer.send(1, "one")
producer.send(2, "two")
producer.send(3, "three")
producer.send(4, "four")
- fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024)
- fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024)
+ fetch1 = FetchRequest("test_hashed_partitioner", 0, 0, 1024)
+ fetch2 = FetchRequest("test_hashed_partitioner", 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -549,7 +566,7 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
-class TestConsumer(unittest.TestCase):
+class TestConsumer(KafkaTestCase):
@classmethod
def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
@@ -648,21 +665,21 @@ class TestConsumer(unittest.TestCase):
def test_simple_consumer_pending(self):
# Produce 10 messages to partition 0 and 1
- produce1 = ProduceRequest("test_simple_pending", 0, messages=[
+ produce1 = ProduceRequest("test_simple_consumer_pending", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- produce2 = ProduceRequest("test_simple_pending", 1, messages=[
+ produce2 = ProduceRequest("test_simple_consumer_pending", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_simple_pending",
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_pending",
auto_commit=False, iter_timeout=0)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
@@ -671,7 +688,7 @@ class TestConsumer(unittest.TestCase):
def test_multi_process_consumer(self):
# Produce 100 messages to partition 0
- produce1 = ProduceRequest("test_mpconsumer", 0, messages=[
+ produce1 = ProduceRequest("test_multi_process_consumer", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(100)
])
@@ -680,7 +697,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Produce 100 messages to partition 1
- produce2 = ProduceRequest("test_mpconsumer", 1, messages=[
+ produce2 = ProduceRequest("test_multi_process_consumer", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(100)
])
@@ -689,7 +706,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False)
+ consumer = MultiProcessConsumer(self.client, "grp1", "test_multi_process_consumer", auto_commit=False)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -702,11 +719,11 @@ class TestConsumer(unittest.TestCase):
start = datetime.now()
messages = consumer.get_messages(block=True, timeout=5)
diff = (datetime.now() - start).total_seconds()
- self.assertGreaterEqual(diff, 5)
+ self.assertGreaterEqual(diff, 4.9)
self.assertEqual(len(messages), 0)
# Send 10 messages
- produce = ProduceRequest("test_mpconsumer", 0, messages=[
+ produce = ProduceRequest("test_multi_process_consumer", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
@@ -729,7 +746,7 @@ class TestConsumer(unittest.TestCase):
def test_multi_proc_pending(self):
# Produce 10 messages to partition 0 and 1
- produce1 = ProduceRequest("test_mppending", 0, messages=[
+ produce1 = ProduceRequest("test_multi_proc_pending", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
@@ -737,7 +754,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- produce2 = ProduceRequest("test_mppending", 1, messages=[
+ produce2 = ProduceRequest("test_multi_proc_pending", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
@@ -745,7 +762,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False)
+ consumer = MultiProcessConsumer(self.client, "group1", "test_multi_proc_pending", auto_commit=False)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -800,19 +817,20 @@ class TestConsumer(unittest.TestCase):
self.assertIsNotNone(message)
self.assertEquals(message.message.value, big_message.value)
-class TestFailover(unittest.TestCase):
- def setUp(self):
+class TestFailover(KafkaTestCase):
- zk_chroot = random_string(10)
- replicas = 2
+ def setUp(self):
+ zk_chroot = random_string(10)
+ replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- self.zk = ZookeeperFixture.instance()
- kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
+ self.zk = ZookeeperFixture.instance()
+ kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+ self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+ KafkaTestCase.setUp(self)
def tearDown(self):
self.client.close()
@@ -835,7 +853,7 @@ class TestFailover(unittest.TestCase):
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
- with self.assertRaises(FailedPayloadsException):
+ with self.assertRaises(FailedPayloadsError):
producer.send_messages('part 1')
producer.send_messages('part 2')
time.sleep(1)
@@ -886,17 +904,17 @@ class TestFailover(unittest.TestCase):
resp = producer.send_messages(random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
- time.sleep(1) # give it some time
+ time.sleep(1) # give it some time
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
- time.sleep(1) # give it some time
+ time.sleep(1) # give it some time
return broker
def _count_messages(self, group, topic):
- client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+ client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer: