diff options
-rw-r--r-- | kafka/producer/base.py | 20 | ||||
-rw-r--r-- | test/fixtures.py | 13 | ||||
-rw-r--r-- | test/service.py | 3 | ||||
-rw-r--r-- | test/test_conn.py | 5 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 28 | ||||
-rw-r--r-- | test/test_failover_integration.py | 10 | ||||
-rw-r--r-- | test/test_producer.py | 86 | ||||
-rw-r--r-- | test/testutil.py | 5 | ||||
-rw-r--r-- | tox.ini | 2 |
9 files changed, 133 insertions, 39 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e0c086b..824ef5d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -41,6 +41,8 @@ ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 ASYNC_STOP_TIMEOUT_SECS = 30 +SYNC_FAIL_ON_ERROR_DEFAULT = True + def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, @@ -144,10 +146,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - reply = client.send_produce_request(request_tries.keys(), + requests = list(request_tries.keys()) + reply = client.send_produce_request(requests, acks=req_acks, timeout=ack_timeout, fail_on_error=False) + for i, response in enumerate(reply): error_cls = None if isinstance(response, FailedPayloadsError): @@ -156,7 +160,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - orig_req = request_tries.keys()[i] + orig_req = requests[i] if error_cls: _handle_error(error_cls, orig_req) @@ -214,6 +218,9 @@ class Producer(object): defaults to 1 (local ack). ack_timeout (int, optional): millisecond timeout to wait for the configured req_acks, defaults to 1000. + sync_fail_on_error (bool, optional): whether sync producer should + raise exceptions (True), or just return errors (False), + defaults to True. async (bool, optional): send message using a background thread, defaults to False. batch_send_every_n (int, optional): If async is True, messages are @@ -256,6 +263,7 @@ class Producer(object): req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, + sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, async=False, batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, @@ -314,6 +322,8 @@ class Producer(object): obj.stop() self._cleanup_func = cleanup atexit.register(cleanup, self) + else: + self.sync_fail_on_error = sync_fail_on_error def send_messages(self, topic, partition, *msg): """ @@ -371,8 +381,10 @@ class Producer(object): messages = create_message_set([(m, key) for m in msg], self.codec, key) req = ProduceRequest(topic, partition, messages) try: - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + resp = self.client.send_produce_request( + [req], acks=self.req_acks, timeout=self.ack_timeout, + fail_on_error=self.sync_fail_on_error + ) except Exception: log.exception("Unable to send messages") raise diff --git a/test/fixtures.py b/test/fixtures.py index 4231452..d4d03ee 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -122,11 +122,11 @@ class ZookeeperFixture(Fixture): # Configure Zookeeper child process args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) env = self.kafka_run_class_env() - self.child = SpawnedService(args, env) # Party! self.out("Starting...") while True: + self.child = SpawnedService(args, env) self.child.start() if self.child.wait_for(r"binding to port", timeout=5): break @@ -202,11 +202,6 @@ class KafkaFixture(Fixture): properties = os.path.join(self.tmp_dir, "kafka.properties") self.render_template(template, properties, vars(self)) - # Configure Kafka child process - args = self.kafka_run_class_args("kafka.Kafka", properties) - env = self.kafka_run_class_env() - self.child = SpawnedService(args, env) - # Party! self.out("Creating Zookeeper chroot node...") args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", @@ -225,7 +220,13 @@ class KafkaFixture(Fixture): self.out("Done!") self.out("Starting...") + + # Configure Kafka child process + args = self.kafka_run_class_args("kafka.Kafka", properties) + env = self.kafka_run_class_env() + while True: + self.child = SpawnedService(args, env) self.child.start() if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5): break diff --git a/test/service.py b/test/service.py index 9368b85..b986a71 100644 --- a/test/service.py +++ b/test/service.py @@ -59,7 +59,8 @@ class SpawnedService(threading.Thread): self.alive = True def _despawn(self): - self.child.terminate() + if self.child.poll() is None: + self.child.terminate() self.alive = False for _ in range(50): if self.child.poll() is not None: diff --git a/test/test_conn.py b/test/test_conn.py index c4f219b..6e47cc8 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,3 +1,4 @@ +import logging import socket import struct from threading import Thread @@ -10,6 +11,10 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE class ConnTest(unittest.TestCase): def setUp(self): + + # kafka.conn debug logging is verbose, so only enable in conn tests + logging.getLogger('kafka.conn').setLevel(logging.DEBUG) + self.config = { 'host': 'localhost', 'port': 9090, diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c202c5c..8911e3e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -170,11 +170,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_simple_consumer_blocking(self): consumer = self.consumer() - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) self.send_messages(0, range(0, 10)) @@ -184,11 +184,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -236,12 +236,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) - # Ask for 5 messages, No messages in queue, block 5 seconds + # Ask for 5 messages, No messages in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) # Send 10 messages self.send_messages(0, range(0, 10)) @@ -252,11 +252,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, 5 in queue, block 5 seconds + # Ask for 10 messages, 5 in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 4.95) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -450,7 +450,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.kafka_consumer(auto_offset_reset='smallest', consumer_timeout_ms=TIMEOUT_MS) - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 500ms with Timer() as t: with self.assertRaises(ConsumerTimeout): msg = consumer.next() @@ -467,7 +467,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 500ms messages = set() with Timer() as t: with self.assertRaises(ConsumerTimeout): diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 5082d7c..91e22cf 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase): # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) + batch_send_every_n=15, + batch_send_every_t=3, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, + async_log_messages_on_error=False) # Send 10 random messages self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # kill leader for partition self._kill_leader(topic, partition) @@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase): # in async mode, this should return immediately producer.send_messages(topic, partition, b'success') + producer.send_messages(topic, partition + 1, b'success') # send to new leader self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # Stop the producer and wait for it to shutdown producer.stop() @@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,), at_least=True) + self.assert_message_count(topic, 21, partitions=(partition + 1,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): diff --git a/test/test_producer.py b/test/test_producer.py index c12af02..27272f6 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,15 +1,18 @@ # -*- coding: utf-8 -*- -import time +import collections import logging +import time from mock import MagicMock, patch from . import unittest -from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions -from kafka.common import AsyncProducerQueueFull -from kafka.producer.base import Producer -from kafka.producer.base import _send_upstream +from kafka import KafkaClient, SimpleProducer +from kafka.common import ( + AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, + ProduceResponse, RetryOptions, TopicAndPartition +) +from kafka.producer.base import Producer, _send_upstream from kafka.protocol import CODEC_NONE import threading @@ -42,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase): producer.send_messages(topic, partition, m) def test_topic_message_types(self): - from kafka.producer.simple import SimpleProducer - client = MagicMock() def partitions(topic): @@ -73,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase): for _ in xrange(producer.queue.qsize()): producer.queue.get() + def test_producer_sync_fail_on_error(self): + error = FailedPayloadsError('failure') + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): + + client = KafkaClient(MagicMock()) + producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + + producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + class TestKafkaProducerSendUpstream(unittest.TestCase): @@ -122,12 +140,21 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) self.client.is_first_time = True def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False return [FailedPayloadsError(req) for req in reqs] - return [] + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponse(req.topic, req.partition, 0, offset) + ) + return responses self.client.send_produce_request.side_effect = send_side_effect @@ -136,8 +163,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) - # there should be 5 non-void cals: 1st failed batch of 3 msgs - # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # plus 3 batches of 3 msgs each + 1 batch of 1 message self.assertEqual(self.client.send_produce_request.call_count, 5) def test_with_limited_retries(self): @@ -157,11 +184,46 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) - # there should be 16 non-void cals: + # there should be 16 non-void calls: # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + - # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 self.assertEqual(self.client.send_produce_request.call_count, 16) + def test_async_producer_not_leader(self): + + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [ProduceResponse(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) + for req in reqs] + + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponse(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + def tearDown(self): for _ in xrange(self.queue.qsize()): self.queue.get() diff --git a/test/testutil.py b/test/testutil.py index 99d8d01..3a1d2ba 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -113,3 +113,8 @@ class Timer(object): self.interval = self.end - self.start logging.basicConfig(level=logging.DEBUG) +logging.getLogger('test.fixtures').setLevel(logging.ERROR) +logging.getLogger('test.service').setLevel(logging.ERROR) + +# kafka.conn debug logging is verbose, disable in tests by default +logging.getLogger('kafka.conn').setLevel(logging.INFO) @@ -11,7 +11,7 @@ deps = mock python-snappy commands = - nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka} + nosetests {posargs:-v -x --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka} setenv = PROJECT_ROOT = {toxinidir} passenv = KAFKA_VERSION |