summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-05-15 12:58:34 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:49 +0300
commita3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch)
tree0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /kafka
parent4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff)
downloadkafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'kafka')
-rw-r--r--kafka/common.py17
-rw-r--r--kafka/producer/base.py57
-rw-r--r--kafka/producer/keyed.py12
-rw-r--r--kafka/producer/simple.py12
4 files changed, 55 insertions, 43 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 87c29f0..8c13798 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -14,15 +14,8 @@ MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
-_ProduceRequest = namedtuple("ProduceRequest",
- ["topic", "partition", "messages", "retries"])
-
-
-class ProduceRequest(_ProduceRequest):
- def __new__(cls, topic, partition, messages, retries=0):
- return super(ProduceRequest, cls).__new__(
- cls, topic, partition, messages, retries)
-
+ProduceRequest = namedtuple("ProduceRequest",
+ ["topic", "partition", "messages"])
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
@@ -79,7 +72,7 @@ KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
# Define retry policy for async producer
-# Limit corner values: None - infinite retries, 0 - no retries
+# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
@@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError):
class AsyncProducerQueueFull(KafkaError):
- pass
+ def __init__(self, failed_msgs, *args):
+ super(AsyncProducerQueueFull, self).__init__(*args)
+ self.failed_msgs = failed_msgs
def _iter_broker_errors():
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 03ef2a7..602e2ed 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -34,8 +34,10 @@ BATCH_SEND_MSG_COUNT = 20
ASYNC_QUEUE_MAXSIZE = 0
ASYNC_QUEUE_PUT_TIMEOUT = 0
# no retries by default
-ASYNC_RETRY_OPTIONS = RetryOptions(
- limit=0, backoff_ms=0, retry_on_timeouts=False)
+ASYNC_RETRY_LIMIT = 0
+ASYNC_RETRY_BACKOFF_MS = 0
+ASYNC_RETRY_ON_TIMEOUTS = False
+
STOP_ASYNC_PRODUCER = -1
@@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
a specified timeout and send them upstream to the brokers in one
request
"""
- reqs = []
+ reqs = {}
client.reinit()
while not stop_event.is_set():
@@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
- messages)
- reqs.append(req)
+ tuple(messages))
+ reqs[req] = 0
if not reqs:
continue
reqs_to_retry, error_type = [], None
- try:
- client.send_produce_request(reqs,
- acks=req_acks,
- timeout=ack_timeout)
- except FailedPayloadsError as ex:
- error_type = FailedPayloadsError
- reqs_to_retry = ex.failed_payloads
+ try:
+ reply = client.send_produce_request(reqs.keys(),
+ acks=req_acks,
+ timeout=ack_timeout,
+ fail_on_error=False)
+ reqs_to_retry = [req for broker_responses in reply
+ for response in broker_responses
+ for req in response.failed_payloads
+ if isinstance(response, FailedPayloadsError)]
+ if reqs_to_retry:
+ error_type = FailedPayloadsError
except RequestTimedOutError:
error_type = RequestTimedOutError
if retry_options.retry_on_timeouts:
- reqs_to_retry = reqs
+ reqs_to_retry = reqs.keys()
except Exception as ex:
error_type = type(ex)
if type(ex) in RETRY_ERROR_TYPES:
- reqs_to_retry = reqs
-
- finally:
- reqs = []
+ reqs_to_retry = reqs.keys()
- if not reqs_to_retry or retry_options.limit == 0:
+ if not reqs_to_retry:
+ reqs = {}
continue
# doing backoff before next retry
@@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_type in RETRY_REFRESH_ERROR_TYPES:
client.load_metadata_for_topics()
- reqs = [req._replace(retries=req.retries+1)
- for req in reqs_to_retry
- if not retry_options.limit or
- (retry_options.limit and req.retries < retry_options.limit)]
+ reqs = {key: count + 1 for key, count in reqs.items()
+ if key in reqs_to_retry and count < retry_options.limit}
class Producer(object):
@@ -161,7 +163,9 @@ class Producer(object):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
@@ -191,6 +195,10 @@ class Producer(object):
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
self.async_queue_put_timeout = async_queue_put_timeout
+ async_retry_options = RetryOptions(
+ limit=async_retry_limit,
+ backoff_ms=async_retry_backoff_ms,
+ retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
@@ -252,7 +260,7 @@ class Producer(object):
raise TypeError("the key must be type bytes")
if self.async:
- for m in msg:
+ for idx, m in enumerate(msg):
try:
item = (TopicAndPartition(topic, partition), m, key)
if self.async_queue_put_timeout == 0:
@@ -261,6 +269,7 @@ class Producer(object):
self.queue.put(item, True, self.async_queue_put_timeout)
except Full:
raise AsyncProducerQueueFull(
+ msg[idx:],
'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
resp = []
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 0fdccd5..5252976 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,8 +7,8 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
- ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -39,7 +39,9 @@ class KeyedProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
@@ -51,7 +53,9 @@ class KeyedProducer(Producer):
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t,
- async_retry_options,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index f7dfc46..ded6eb6 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,8 +10,8 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
- ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -47,7 +47,9 @@ class SimpleProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True,
- async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
@@ -56,7 +58,9 @@ class SimpleProducer(Producer):
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t,
- async_retry_options,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)