diff options
| author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-05-15 12:58:34 +0300 |
|---|---|---|
| committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:49 +0300 |
| commit | a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch) | |
| tree | 0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /kafka/producer/keyed.py | |
| parent | 4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff) | |
| download | kafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz | |
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only
- using an internal dict with with namedtuple keys for retry counters
- refresh metadata on refresh_error irrespective to retries options
- removed infinite retries (retry_options.limit=None) as an over-feature
- separate producer init args for retries options (limit,backoff,on_timeouts)
- AsyncProducerQueueFull returns a list of failed messages
- producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'kafka/producer/keyed.py')
| -rw-r--r-- | kafka/producer/keyed.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 0fdccd5..5252976 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,8 +7,8 @@ from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, - ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -39,7 +39,9 @@ class KeyedProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: @@ -51,7 +53,9 @@ class KeyedProducer(Producer): ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - async_retry_options, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, async_queue_maxsize, async_queue_put_timeout) |
