diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-05-15 12:58:34 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:49 +0300 |
commit | a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch) | |
tree | 0d2746735852a9e3b1f6cebfbd2e33e237ff2aea | |
parent | 4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff) | |
download | kafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz |
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only
- using an internal dict with with namedtuple keys for retry counters
- refresh metadata on refresh_error irrespective to retries options
- removed infinite retries (retry_options.limit=None) as an over-feature
- separate producer init args for retries options (limit,backoff,on_timeouts)
- AsyncProducerQueueFull returns a list of failed messages
- producer tests improved thanks to @rogaha and @toli
-rw-r--r-- | kafka/common.py | 17 | ||||
-rw-r--r-- | kafka/producer/base.py | 57 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 12 | ||||
-rw-r--r-- | kafka/producer/simple.py | 12 | ||||
-rw-r--r-- | test/test_producer.py | 53 |
5 files changed, 75 insertions, 76 deletions
diff --git a/kafka/common.py b/kafka/common.py index 87c29f0..8c13798 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -14,15 +14,8 @@ MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -_ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages", "retries"]) - - -class ProduceRequest(_ProduceRequest): - def __new__(cls, topic, partition, messages, retries=0): - return super(ProduceRequest, cls).__new__( - cls, topic, partition, messages, retries) - +ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages"]) ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -79,7 +72,7 @@ KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) # Define retry policy for async producer -# Limit corner values: None - infinite retries, 0 - no retries +# Limit value: int >= 0, 0 means no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) @@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError): class AsyncProducerQueueFull(KafkaError): - pass + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs def _iter_broker_errors(): diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 03ef2a7..602e2ed 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -34,8 +34,10 @@ BATCH_SEND_MSG_COUNT = 20 ASYNC_QUEUE_MAXSIZE = 0 ASYNC_QUEUE_PUT_TIMEOUT = 0 # no retries by default -ASYNC_RETRY_OPTIONS = RetryOptions( - limit=0, backoff_ms=0, retry_on_timeouts=False) +ASYNC_RETRY_LIMIT = 0 +ASYNC_RETRY_BACKOFF_MS = 0 +ASYNC_RETRY_ON_TIMEOUTS = False + STOP_ASYNC_PRODUCER = -1 @@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, a specified timeout and send them upstream to the brokers in one request """ - reqs = [] + reqs = {} client.reinit() while not stop_event.is_set(): @@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, - messages) - reqs.append(req) + tuple(messages)) + reqs[req] = 0 if not reqs: continue reqs_to_retry, error_type = [], None - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except FailedPayloadsError as ex: - error_type = FailedPayloadsError - reqs_to_retry = ex.failed_payloads + try: + reply = client.send_produce_request(reqs.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + reqs_to_retry = [req for broker_responses in reply + for response in broker_responses + for req in response.failed_payloads + if isinstance(response, FailedPayloadsError)] + if reqs_to_retry: + error_type = FailedPayloadsError except RequestTimedOutError: error_type = RequestTimedOutError if retry_options.retry_on_timeouts: - reqs_to_retry = reqs + reqs_to_retry = reqs.keys() except Exception as ex: error_type = type(ex) if type(ex) in RETRY_ERROR_TYPES: - reqs_to_retry = reqs - - finally: - reqs = [] + reqs_to_retry = reqs.keys() - if not reqs_to_retry or retry_options.limit == 0: + if not reqs_to_retry: + reqs = {} continue # doing backoff before next retry @@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_type in RETRY_REFRESH_ERROR_TYPES: client.load_metadata_for_topics() - reqs = [req._replace(retries=req.retries+1) - for req in reqs_to_retry - if not retry_options.limit or - (retry_options.limit and req.retries < retry_options.limit)] + reqs = {key: count + 1 for key, count in reqs.items() + if key in reqs_to_retry and count < retry_options.limit} class Producer(object): @@ -161,7 +163,9 @@ class Producer(object): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): @@ -191,6 +195,10 @@ class Producer(object): # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout + async_retry_options = RetryOptions( + limit=async_retry_limit, + backoff_ms=async_retry_backoff_ms, + retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -252,7 +260,7 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: - for m in msg: + for idx, m in enumerate(msg): try: item = (TopicAndPartition(topic, partition), m, key) if self.async_queue_put_timeout == 0: @@ -261,6 +269,7 @@ class Producer(object): self.queue.put(item, True, self.async_queue_put_timeout) except Full: raise AsyncProducerQueueFull( + msg[idx:], 'Producer async queue overfilled. ' 'Current queue size %d.' % self.queue.qsize()) resp = [] diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 0fdccd5..5252976 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,8 +7,8 @@ from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, - ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -39,7 +39,9 @@ class KeyedProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: @@ -51,7 +53,9 @@ class KeyedProducer(Producer): ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - async_retry_options, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, async_queue_maxsize, async_queue_put_timeout) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index f7dfc46..ded6eb6 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,8 +10,8 @@ from six.moves import xrange from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, - ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -47,7 +47,9 @@ class SimpleProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, random_start=True, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} @@ -56,7 +58,9 @@ class SimpleProducer(Producer): ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - async_retry_options, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, async_queue_maxsize, async_queue_put_timeout) diff --git a/test/test_producer.py b/test/test_producer.py index 3004c2d..a2ba877 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -17,6 +17,10 @@ try: from queue import Empty, Queue except ImportError: from Queue import Empty, Queue +try: + xrange +except NameError: + xrange = range class TestKafkaProducer(unittest.TestCase): @@ -52,7 +56,8 @@ class TestKafkaProducer(unittest.TestCase): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - def test_producer_async_queue_overfilled_batch_send(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled_batch_send(self, mock): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size) @@ -64,8 +69,12 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() - def test_producer_async_queue_overfilled(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): queue_size = 2 producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) @@ -77,7 +86,9 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) - + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() class TestKafkaProducerSendUpstream(unittest.TestCase): @@ -121,7 +132,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # 3 batches of 3 msgs each + 1 batch of 1 message self.assertEqual(self.client.send_produce_request.call_count, 4) - def test_first_send_failed(self): # lets create a queue and add 10 messages for 10 different partitions @@ -133,7 +143,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - raise FailedPayloadsError(reqs) + return [[FailedPayloadsError(reqs)]] + return [] self.client.send_produce_request.side_effect = send_side_effect @@ -154,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): - raise FailedPayloadsError(reqs) + return [[FailedPayloadsError(reqs)]] self.client.send_produce_request.side_effect = send_side_effect @@ -168,30 +179,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed self.assertEqual(self.client.send_produce_request.call_count, 16) - def test_with_unlimited_retries(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - - def send_side_effect(reqs, *args, **kwargs): - raise FailedPayloadsError(reqs) - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(None) - - # the queue should have 7 elements - # 3 batches of 1 msg each were retried all this time - self.assertEqual(self.queue.empty(), False) - try: - for i in range(7): - self.queue.get(timeout=0.01) - except Empty: - self.fail("Should be 7 elems in the queue") - self.assertEqual(self.queue.empty(), True) - - # 1s / 50ms of backoff = 20 times max - calls = self.client.send_produce_request.call_count - self.assertTrue(calls > 10 & calls <= 20) + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() |