diff options
author | Omar <omar.ghishan@rd.io> | 2014-01-14 15:22:05 -0800 |
---|---|---|
committer | Omar <omar.ghishan@rd.io> | 2014-01-14 15:22:05 -0800 |
commit | 9644166048d6fe1cdbd1fc3096329ee2142b147e (patch) | |
tree | 0fb10b5d90f1db2a07e0075c166f3fdea258dbd7 | |
parent | 4e1cfc842dbfd65f6469e0e70e5977599e544094 (diff) | |
parent | 2818ddfa924b470b844222cfd76cbe8cdf8dcc51 (diff) | |
download | kafka-python-9644166048d6fe1cdbd1fc3096329ee2142b147e.tar.gz |
Merge pull request #100 from cosbynator/no_infinite_loops_real
Branch fix: No infinite loops during metadata requests, invalidate metadata more, exception hierarchy
-rw-r--r-- | kafka/client.py | 168 | ||||
-rw-r--r-- | kafka/common.py | 35 | ||||
-rw-r--r-- | kafka/consumer.py | 2 | ||||
-rw-r--r-- | kafka/producer.py | 15 | ||||
-rw-r--r-- | test/test_integration.py | 223 |
5 files changed, 245 insertions, 198 deletions
diff --git a/kafka/client.py b/kafka/client.py index 33c6d77..1016051 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,14 +1,15 @@ import copy +import logging + from collections import defaultdict from functools import partial from itertools import count -import logging -import time -from kafka.common import ( - ErrorMapping, TopicAndPartition, ConnectionError, - FailedPayloadsException -) +from kafka.common import (ErrorMapping, TopicAndPartition, + ConnectionError, FailedPayloadsError, + BrokerResponseError, PartitionUnavailableError, + KafkaUnavailableError, KafkaRequestError) + from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -29,8 +30,8 @@ class KafkaClient(object): } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self._load_metadata_for_topics() + self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.load_metadata_for_topics() # bootstrap with all metadata ################## # Private API # @@ -49,55 +50,13 @@ class KafkaClient(object): def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: - self._load_metadata_for_topics(topic) + self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) + raise KafkaRequestError("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] - def _load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This method will - recurse in the event of a retry. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - - response = self._send_broker_unaware_request(request_id, request) - if response is None: - raise Exception("All servers failed to process request") - - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) - - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) - - self.brokers = brokers - self.topics_to_brokers = {} - - for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) - - if not partitions: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - break - - for partition, meta in partitions.items(): - if meta.leader == -1: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - else: - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) - def _next_id(self): """ Generate a new correlation id @@ -119,7 +78,7 @@ class KafkaClient(object): "trying next server: %s" % (request, conn, e)) continue - return None + raise KafkaUnavailableError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -150,6 +109,8 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) + if leader == -1: + raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -185,21 +146,51 @@ class KafkaClient(object): if failed: failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.reset_all_metadata() continue for response in decoder_fn(response): acc[(response.topic, response.partition)] = response if failed_payloads: - raise FailedPayloadsException(failed_payloads) + raise FailedPayloadsError(failed_payloads) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () + def _raise_on_response_error(self, resp): + if resp.error == ErrorMapping.NO_ERROR: + return + + if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, + ErrorMapping.NOT_LEADER_FOR_PARTITION): + self.reset_topic_metadata(resp.topic) + + raise BrokerResponseError( + "Request for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + ################# # Public API # ################# + def reset_topic_metadata(self, *topics): + for topic in topics: + try: + partitions = self.topic_partitions[topic] + except KeyError: + continue + + for partition in partitions: + self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) + + del self.topic_partitions[topic] + + def reset_all_metadata(self): + self.topics_to_brokers.clear() + self.topic_partitions.clear() + + def has_metadata_for_topic(self, topic): + return topic in self.topic_partitions def close(self): for conn in self.conns.values(): @@ -219,6 +210,36 @@ class KafkaClient(object): for conn in self.conns.values(): conn.reinit() + def load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics. This function is called + lazily whenever metadata is unavailable. + """ + request_id = self._next_id() + request = KafkaProtocol.encode_metadata_request(self.client_id, + request_id, topics) + + response = self._send_broker_unaware_request(request_id, request) + + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + + self.brokers = brokers + + for topic, partitions in topics.items(): + self.reset_topic_metadata(topic) + + if not partitions: + continue + + self.topic_partitions[topic] = [] + for partition, meta in partitions.items(): + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -256,14 +277,9 @@ class KafkaClient(object): out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( - "ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -289,14 +305,9 @@ class KafkaClient(object): out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( - "FetchRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -312,9 +323,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: @@ -330,9 +340,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with " - "errorcode=%s", resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) @@ -350,9 +359,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/common.py b/kafka/common.py index 6f0dd32..c0a1a6a 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,23 +69,46 @@ class ErrorMapping(object): # Exceptions # ################# -class FailedPayloadsException(Exception): + +class KafkaError(RuntimeError): + pass + + +class KafkaRequestError(KafkaError): + pass + + +class KafkaUnavailableError(KafkaError): + pass + + +class BrokerResponseError(KafkaError): pass -class ConnectionError(Exception): + +class PartitionUnavailableError(KafkaError): + pass + + +class FailedPayloadsError(KafkaError): pass -class BufferUnderflowError(Exception): + +class ConnectionError(KafkaError): + pass + + +class BufferUnderflowError(KafkaError): pass -class ChecksumError(Exception): +class ChecksumError(KafkaError): pass -class ConsumerFetchSizeTooSmall(Exception): +class ConsumerFetchSizeTooSmall(KafkaError): pass -class ConsumerNoMoreData(Exception): +class ConsumerNoMoreData(KafkaError): pass diff --git a/kafka/consumer.py b/kafka/consumer.py index eba2912..522d6ca 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -76,7 +76,7 @@ class Consumer(object): self.client = client self.topic = topic self.group = group - self.client._load_metadata_for_topics(topic) + self.client.load_metadata_for_topics(topic) self.offsets = {} if not partitions: diff --git a/kafka/producer.py b/kafka/producer.py index 5aead43..6ed22ee 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,17 +1,16 @@ from __future__ import absolute_import +import logging +import time + +from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from Queue import Empty -import logging -import sys -import time from kafka.common import ProduceRequest -from kafka.common import FailedPayloadsException -from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner +from kafka.protocol import create_message log = logging.getLogger("kafka") @@ -188,7 +187,7 @@ class SimpleProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) self.next_partition = cycle(client.topic_partitions[topic]) super(SimpleProducer, self).__init__(client, async, req_acks, @@ -225,7 +224,7 @@ class KeyedProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) if not partitioner: partitioner = HashedPartitioner diff --git a/test/test_integration.py b/test/test_integration.py index eaf432d..5a22630 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,7 +12,32 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture -class TestKafkaClient(unittest.TestCase): +def random_string(l): + s = "".join(random.choice(string.letters) for i in xrange(l)) + return s + + +def ensure_topic_creation(client, topic_name): + times = 0 + while True: + times += 1 + client.load_metadata_for_topics(topic_name) + if client.has_metadata_for_topic(topic_name): + break + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create topic %s" % topic_name) + + +class KafkaTestCase(unittest.TestCase): + def setUp(self): + self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + ensure_topic_creation(self.client, self.topic) + + +class TestKafkaClient(KafkaTestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -30,7 +55,8 @@ class TestKafkaClient(unittest.TestCase): ##################### def test_produce_many_simple(self): - produce = ProduceRequest("test_produce_many_simple", 0, messages=[ + + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -38,25 +64,25 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 100) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 200) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 300) def test_produce_10k_simple(self): - produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(10000) ]) @@ -64,7 +90,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 10000) def test_produce_many_gzip(self): @@ -73,13 +99,13 @@ class TestKafkaClient(unittest.TestCase): message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) def test_produce_many_snappy(self): @@ -88,13 +114,13 @@ class TestKafkaClient(unittest.TestCase): message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) def test_produce_mixed(self): @@ -104,17 +130,17 @@ class TestKafkaClient(unittest.TestCase): message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2, message3]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 201) def test_produce_100k_gzipped(self): - req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + req1 = ProduceRequest(self.topic, 0, messages=[ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) ]) @@ -122,10 +148,10 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 50000) - req2 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + req2 = ProduceRequest(self.topic, 0, messages=[ create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) ]) @@ -133,7 +159,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 50000) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100000) ##################### @@ -141,18 +167,18 @@ class TestKafkaClient(unittest.TestCase): ##################### def test_consume_none(self): - fetch = FetchRequest("test_consume_none", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) - self.assertEquals(fetch_resp.topic, "test_consume_none") + self.assertEquals(fetch_resp.topic, self.topic) self.assertEquals(fetch_resp.partition, 0) messages = list(fetch_resp.messages) self.assertEquals(len(messages), 0) def test_produce_consume(self): - produce = ProduceRequest("test_produce_consume", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Just a test message"), create_message("Message with a key", "foo"), ]) @@ -161,7 +187,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - fetch = FetchRequest("test_produce_consume", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) @@ -176,7 +202,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(messages[1].message.key, "foo") def test_produce_consume_many(self): - produce = ProduceRequest("test_produce_consume_many", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -185,7 +211,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.offset, 0) # 1024 is not enough for 100 messages... - fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) (fetch_resp1,) = self.client.send_fetch_request([fetch1]) @@ -195,7 +221,7 @@ class TestKafkaClient(unittest.TestCase): self.assertTrue(len(messages) < 100) # 10240 should be enough - fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240) + fetch2 = FetchRequest(self.topic, 0, 0, 10240) (fetch_resp2,) = self.client.send_fetch_request([fetch2]) self.assertEquals(fetch_resp2.error, 0) @@ -208,10 +234,10 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(message.message.key, None) def test_produce_consume_two_partitions(self): - produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Partition 0 %d" % i) for i in range(10) ]) - produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Partition 1 %d" % i) for i in range(10) ]) @@ -219,8 +245,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) - fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) self.assertEquals(fetch_resp1.highwaterMark, 10) @@ -245,11 +271,11 @@ class TestKafkaClient(unittest.TestCase): @unittest.skip('commmit offset not supported in this version') def test_commit_fetch_offsets(self): - req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") + req = OffsetCommitRequest(self.topic, 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) self.assertEquals(resp.error, 0) - req = OffsetFetchRequest("test_commit_fetch_offsets", 0) + req = OffsetFetchRequest(self.topic, 0) (resp,) = self.client.send_offset_fetch_request("group", [req]) self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 42) @@ -258,7 +284,7 @@ class TestKafkaClient(unittest.TestCase): # Producer Tests def test_simple_producer(self): - producer = SimpleProducer(self.client, "test_simple_producer") + producer = SimpleProducer(self.client, self.topic) resp = producer.send_messages("one", "two") # Will go to partition 0 @@ -272,8 +298,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg - fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) @@ -297,15 +323,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_round_robin_partitioner(self): - producer = KeyedProducer(self.client, "test_round_robin_partitioner", + producer = KeyedProducer(self.client, self.topic, partitioner=RoundRobinPartitioner) producer.send("key1", "one") producer.send("key2", "two") producer.send("key3", "three") producer.send("key4", "four") - fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -331,15 +357,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, "test_hash_partitioner", + producer = KeyedProducer(self.client, self.topic, partitioner=HashedPartitioner) producer.send(1, "one") producer.send(2, "two") producer.send(3, "three") producer.send(4, "four") - fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -365,12 +391,12 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_acks_none(self): - producer = SimpleProducer(self.client, "test_acks_none", + producer = SimpleProducer(self.client, self.topic, req_acks=SimpleProducer.ACK_NOT_REQUIRED) resp = producer.send_messages("one") self.assertEquals(len(resp), 0) - fetch = FetchRequest("test_acks_none", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -384,12 +410,12 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_acks_local_write(self): - producer = SimpleProducer(self.client, "test_acks_local_write", + producer = SimpleProducer(self.client, self.topic, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) resp = producer.send_messages("one") self.assertEquals(len(resp), 1) - fetch = FetchRequest("test_acks_local_write", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -404,12 +430,12 @@ class TestKafkaClient(unittest.TestCase): def test_acks_cluster_commit(self): producer = SimpleProducer( - self.client, "test_acks_cluster_commit", + self.client, self.topic, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) resp = producer.send_messages("one") self.assertEquals(len(resp), 1) - fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -423,16 +449,14 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_async_simple_producer(self): - producer = SimpleProducer(self.client, "test_async_simple_producer", - async=True) - + producer = SimpleProducer(self.client, self.topic, async=True) resp = producer.send_messages("one") self.assertEquals(len(resp), 0) # Give it some time time.sleep(2) - fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -446,8 +470,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_async_keyed_producer(self): - producer = KeyedProducer(self.client, "test_async_keyed_producer", - async=True) + producer = KeyedProducer(self.client, self.topic, async=True) resp = producer.send("key1", "one") self.assertEquals(len(resp), 0) @@ -455,7 +478,7 @@ class TestKafkaClient(unittest.TestCase): # Give it some time time.sleep(2) - fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -469,7 +492,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_batched_simple_producer(self): - producer = SimpleProducer(self.client, "test_batched_simple_producer", + producer = SimpleProducer(self.client, self.topic, batch_send=True, batch_send_every_n=10, batch_send_every_t=20) @@ -484,8 +507,8 @@ class TestKafkaClient(unittest.TestCase): # Give it some time time.sleep(2) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -504,8 +527,8 @@ class TestKafkaClient(unittest.TestCase): # Give it some time time.sleep(2) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -523,8 +546,8 @@ class TestKafkaClient(unittest.TestCase): msgs = ["message-%d" % i for i in range(15, 17)] resp = producer.send_messages(*msgs) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -536,8 +559,8 @@ class TestKafkaClient(unittest.TestCase): # Give it some time time.sleep(22) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -549,7 +572,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() -class TestConsumer(unittest.TestCase): +class TestConsumer(KafkaTestCase): @classmethod def setUpClass(cls): cls.zk = ZookeeperFixture.instance() @@ -566,7 +589,7 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -575,7 +598,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -585,7 +608,7 @@ class TestConsumer(unittest.TestCase): # Start a consumer consumer = SimpleConsumer(self.client, "group1", - "test_simple_consumer", auto_commit=False, + self.topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: @@ -613,7 +636,7 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer_blocking(self): consumer = SimpleConsumer(self.client, "group1", - "test_simple_consumer_blocking", + self.topic, auto_commit=False, iter_timeout=0) # Blocking API @@ -624,7 +647,7 @@ class TestConsumer(unittest.TestCase): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -648,21 +671,21 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", + consumer = SimpleConsumer(self.client, "group1", self.topic, auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) @@ -671,7 +694,7 @@ class TestConsumer(unittest.TestCase): def test_multi_process_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -680,7 +703,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -689,7 +712,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -702,11 +725,11 @@ class TestConsumer(unittest.TestCase): start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) diff = (datetime.now() - start).total_seconds() - self.assertGreaterEqual(diff, 5) + self.assertGreaterEqual(diff, 4.999) self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -729,7 +752,7 @@ class TestConsumer(unittest.TestCase): def test_multi_proc_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -737,7 +760,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -745,7 +768,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -755,7 +778,7 @@ class TestConsumer(unittest.TestCase): def test_large_messages(self): # Produce 10 "normal" size messages messages1 = [create_message(random_string(1024)) for i in range(10)] - produce1 = ProduceRequest("test_large_messages", 0, messages1) + produce1 = ProduceRequest(self.topic, 0, messages1) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) @@ -763,14 +786,14 @@ class TestConsumer(unittest.TestCase): # Produce 10 messages that are large (bigger than default fetch size) messages2 = [create_message(random_string(5000)) for i in range(10)] - produce2 = ProduceRequest("test_large_messages", 0, messages2) + produce2 = ProduceRequest(self.topic, 0, messages2) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + consumer = SimpleConsumer(self.client, "group1", self.topic, auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): @@ -780,7 +803,7 @@ class TestConsumer(unittest.TestCase): # Produce 1 message that is too large (bigger than max fetch size) big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 big_message = create_message(random_string(big_message_size)) - produce3 = ProduceRequest("test_large_messages", 0, [big_message]) + produce3 = ProduceRequest(self.topic, 0, [big_message]) for resp in self.client.send_produce_request([produce3]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 20) @@ -788,7 +811,7 @@ class TestConsumer(unittest.TestCase): self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) # Create a consumer with no fetch size limit - big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + big_consumer = SimpleConsumer(self.client, "group1", self.topic, max_buffer_size=None, partitions=[0], auto_commit=False, iter_timeout=0) @@ -800,19 +823,20 @@ class TestConsumer(unittest.TestCase): self.assertIsNotNone(message) self.assertEquals(message.message.value, big_message.value) -class TestFailover(unittest.TestCase): - def setUp(self): +class TestFailover(KafkaTestCase): - zk_chroot = random_string(10) - replicas = 2 + def setUp(self): + zk_chroot = random_string(10) + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + super(TestFailover, self).setUp() def tearDown(self): self.client.close() @@ -821,8 +845,7 @@ class TestFailover(unittest.TestCase): self.zk.close() def test_switch_leader(self): - - key, topic, partition = random_string(5), 'test_switch_leader', 0 + key, topic, partition = random_string(5), self.topic, 0 producer = SimpleProducer(self.client, topic) for i in range(1, 4): @@ -835,7 +858,7 @@ class TestFailover(unittest.TestCase): broker = self._kill_leader(topic, partition) # expect failure, reload meta data - with self.assertRaises(FailedPayloadsException): + with self.assertRaises(FailedPayloadsError): producer.send_messages('part 1') producer.send_messages('part 2') time.sleep(1) @@ -853,8 +876,7 @@ class TestFailover(unittest.TestCase): producer.stop() def test_switch_leader_async(self): - - key, topic, partition = random_string(5), 'test_switch_leader_async', 0 + key, topic, partition = random_string(5), self.topic, 0 producer = SimpleProducer(self.client, topic, async=True) for i in range(1, 4): @@ -886,17 +908,17 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(self.brokers[0].host, self.brokers[0].port) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: @@ -905,11 +927,6 @@ class TestFailover(unittest.TestCase): client.close() return len(all_messages) - -def random_string(l): - s = "".join(random.choice(string.letters) for i in xrange(l)) - return s - if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) unittest.main() |