summaryrefslogtreecommitdiff
path: root/kafka/producer/keyed.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-05-15 12:58:34 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:49 +0300
commita3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch)
tree0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /kafka/producer/keyed.py
parent4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff)
downloadkafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'kafka/producer/keyed.py')
-rw-r--r--kafka/producer/keyed.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 0fdccd5..5252976 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,8 +7,8 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
- ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
+ BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
+ ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger("kafka")
@@ -39,7 +39,9 @@ class KeyedProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_retry_limit=ASYNC_RETRY_LIMIT,
+ async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
+ async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
@@ -51,7 +53,9 @@ class KeyedProducer(Producer):
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t,
- async_retry_options,
+ async_retry_limit,
+ async_retry_backoff_ms,
+ async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)