summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py20
-rw-r--r--test/fixtures.py13
-rw-r--r--test/service.py3
-rw-r--r--test/test_conn.py5
-rw-r--r--test/test_consumer_integration.py28
-rw-r--r--test/test_failover_integration.py10
-rw-r--r--test/test_producer.py86
-rw-r--r--test/testutil.py5
-rw-r--r--tox.ini2
9 files changed, 133 insertions, 39 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e0c086b..824ef5d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -41,6 +41,8 @@ ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
ASYNC_STOP_TIMEOUT_SECS = 30
+SYNC_FAIL_ON_ERROR_DEFAULT = True
+
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
@@ -144,10 +146,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True
- reply = client.send_produce_request(request_tries.keys(),
+ requests = list(request_tries.keys())
+ reply = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
+
for i, response in enumerate(reply):
error_cls = None
if isinstance(response, FailedPayloadsError):
@@ -156,7 +160,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
- orig_req = request_tries.keys()[i]
+ orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)
@@ -214,6 +218,9 @@ class Producer(object):
defaults to 1 (local ack).
ack_timeout (int, optional): millisecond timeout to wait for the
configured req_acks, defaults to 1000.
+ sync_fail_on_error (bool, optional): whether sync producer should
+ raise exceptions (True), or just return errors (False),
+ defaults to True.
async (bool, optional): send message using a background thread,
defaults to False.
batch_send_every_n (int, optional): If async is True, messages are
@@ -256,6 +263,7 @@ class Producer(object):
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
+ sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
@@ -314,6 +322,8 @@ class Producer(object):
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
+ else:
+ self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
"""
@@ -371,8 +381,10 @@ class Producer(object):
messages = create_message_set([(m, key) for m in msg], self.codec, key)
req = ProduceRequest(topic, partition, messages)
try:
- resp = self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+ resp = self.client.send_produce_request(
+ [req], acks=self.req_acks, timeout=self.ack_timeout,
+ fail_on_error=self.sync_fail_on_error
+ )
except Exception:
log.exception("Unable to send messages")
raise
diff --git a/test/fixtures.py b/test/fixtures.py
index 4231452..d4d03ee 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -122,11 +122,11 @@ class ZookeeperFixture(Fixture):
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
- self.child = SpawnedService(args, env)
# Party!
self.out("Starting...")
while True:
+ self.child = SpawnedService(args, env)
self.child.start()
if self.child.wait_for(r"binding to port", timeout=5):
break
@@ -202,11 +202,6 @@ class KafkaFixture(Fixture):
properties = os.path.join(self.tmp_dir, "kafka.properties")
self.render_template(template, properties, vars(self))
- # Configure Kafka child process
- args = self.kafka_run_class_args("kafka.Kafka", properties)
- env = self.kafka_run_class_env()
- self.child = SpawnedService(args, env)
-
# Party!
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
@@ -225,7 +220,13 @@ class KafkaFixture(Fixture):
self.out("Done!")
self.out("Starting...")
+
+ # Configure Kafka child process
+ args = self.kafka_run_class_args("kafka.Kafka", properties)
+ env = self.kafka_run_class_env()
+
while True:
+ self.child = SpawnedService(args, env)
self.child.start()
if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5):
break
diff --git a/test/service.py b/test/service.py
index 9368b85..b986a71 100644
--- a/test/service.py
+++ b/test/service.py
@@ -59,7 +59,8 @@ class SpawnedService(threading.Thread):
self.alive = True
def _despawn(self):
- self.child.terminate()
+ if self.child.poll() is None:
+ self.child.terminate()
self.alive = False
for _ in range(50):
if self.child.poll() is not None:
diff --git a/test/test_conn.py b/test/test_conn.py
index c4f219b..6e47cc8 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -1,3 +1,4 @@
+import logging
import socket
import struct
from threading import Thread
@@ -10,6 +11,10 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE
class ConnTest(unittest.TestCase):
def setUp(self):
+
+ # kafka.conn debug logging is verbose, so only enable in conn tests
+ logging.getLogger('kafka.conn').setLevel(logging.DEBUG)
+
self.config = {
'host': 'localhost',
'port': 9090,
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index c202c5c..8911e3e 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -170,11 +170,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_simple_consumer_blocking(self):
consumer = self.consumer()
- # Ask for 5 messages, nothing in queue, block 5 seconds
+ # Ask for 5 messages, nothing in queue, block 1 second
with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=5)
+ messages = consumer.get_messages(block=True, timeout=1)
self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 5)
+ self.assertGreaterEqual(t.interval, 1)
self.send_messages(0, range(0, 10))
@@ -184,11 +184,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
- # Ask for 10 messages, get 5 back, block 5 seconds
+ # Ask for 10 messages, get 5 back, block 1 second
with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=5)
+ messages = consumer.get_messages(count=10, block=True, timeout=1)
self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 5)
+ self.assertGreaterEqual(t.interval, 1)
consumer.stop()
@@ -236,12 +236,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
- # Ask for 5 messages, No messages in queue, block 5 seconds
+ # Ask for 5 messages, No messages in queue, block 1 second
with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=5)
+ messages = consumer.get_messages(block=True, timeout=1)
self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 5)
+ self.assertGreaterEqual(t.interval, 1)
# Send 10 messages
self.send_messages(0, range(0, 10))
@@ -252,11 +252,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
- # Ask for 10 messages, 5 in queue, block 5 seconds
+ # Ask for 10 messages, 5 in queue, block 1 second
with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=5)
+ messages = consumer.get_messages(count=10, block=True, timeout=1)
self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 4.95)
+ self.assertGreaterEqual(t.interval, 1)
consumer.stop()
@@ -450,7 +450,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.kafka_consumer(auto_offset_reset='smallest',
consumer_timeout_ms=TIMEOUT_MS)
- # Ask for 5 messages, nothing in queue, block 5 seconds
+ # Ask for 5 messages, nothing in queue, block 500ms
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
msg = consumer.next()
@@ -467,7 +467,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
- # Ask for 10 messages, get 5 back, block 5 seconds
+ # Ask for 10 messages, get 5 back, block 500ms
messages = set()
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 5082d7c..91e22cf 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase):
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
+ batch_send_every_n=15,
+ batch_send_every_t=3,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+ async_log_messages_on_error=False)
# Send 10 random messages
self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition + 1, 10)
# kill leader for partition
self._kill_leader(topic, partition)
@@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase):
# in async mode, this should return immediately
producer.send_messages(topic, partition, b'success')
+ producer.send_messages(topic, partition + 1, b'success')
# send to new leader
self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition + 1, 10)
# Stop the producer and wait for it to shutdown
producer.stop()
@@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,),
at_least=True)
+ self.assert_message_count(topic, 21, partitions=(partition + 1,),
+ at_least=True)
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
diff --git a/test/test_producer.py b/test/test_producer.py
index c12af02..27272f6 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,15 +1,18 @@
# -*- coding: utf-8 -*-
-import time
+import collections
import logging
+import time
from mock import MagicMock, patch
from . import unittest
-from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
-from kafka.common import AsyncProducerQueueFull
-from kafka.producer.base import Producer
-from kafka.producer.base import _send_upstream
+from kafka import KafkaClient, SimpleProducer
+from kafka.common import (
+ AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
+ ProduceResponse, RetryOptions, TopicAndPartition
+)
+from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
import threading
@@ -42,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, partition, m)
def test_topic_message_types(self):
- from kafka.producer.simple import SimpleProducer
-
client = MagicMock()
def partitions(topic):
@@ -73,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase):
for _ in xrange(producer.queue.qsize()):
producer.queue.get()
+ def test_producer_sync_fail_on_error(self):
+ error = FailedPayloadsError('failure')
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
+
+ client = KafkaClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
+
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
+
class TestKafkaProducerSendUpstream(unittest.TestCase):
@@ -122,12 +140,21 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
return [FailedPayloadsError(req) for req in reqs]
- return []
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponse(req.topic, req.partition, 0, offset)
+ )
+ return responses
self.client.send_produce_request.side_effect = send_side_effect
@@ -136,8 +163,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
- # there should be 5 non-void cals: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # plus 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.client.send_produce_request.call_count, 5)
def test_with_limited_retries(self):
@@ -157,11 +184,46 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
- # there should be 16 non-void cals:
+ # there should be 16 non-void calls:
# 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
+ # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
self.assertEqual(self.client.send_produce_request.call_count, 16)
+ def test_async_producer_not_leader(self):
+
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [ProduceResponse(req.topic, req.partition,
+ NotLeaderForPartitionError.errno, -1)
+ for req in reqs]
+
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponse(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
def tearDown(self):
for _ in xrange(self.queue.qsize()):
self.queue.get()
diff --git a/test/testutil.py b/test/testutil.py
index 99d8d01..3a1d2ba 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -113,3 +113,8 @@ class Timer(object):
self.interval = self.end - self.start
logging.basicConfig(level=logging.DEBUG)
+logging.getLogger('test.fixtures').setLevel(logging.ERROR)
+logging.getLogger('test.service').setLevel(logging.ERROR)
+
+# kafka.conn debug logging is verbose, disable in tests by default
+logging.getLogger('kafka.conn').setLevel(logging.INFO)
diff --git a/tox.ini b/tox.ini
index e3e8568..fcb8908 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,7 +11,7 @@ deps =
mock
python-snappy
commands =
- nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
+ nosetests {posargs:-v -x --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
passenv = KAFKA_VERSION