diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-05-15 12:58:34 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:49 +0300 |
commit | a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch) | |
tree | 0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /kafka/common.py | |
parent | 4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff) | |
download | kafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz |
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only
- using an internal dict with with namedtuple keys for retry counters
- refresh metadata on refresh_error irrespective to retries options
- removed infinite retries (retry_options.limit=None) as an over-feature
- separate producer init args for retries options (limit,backoff,on_timeouts)
- AsyncProducerQueueFull returns a list of failed messages
- producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'kafka/common.py')
-rw-r--r-- | kafka/common.py | 17 |
1 files changed, 6 insertions, 11 deletions
diff --git a/kafka/common.py b/kafka/common.py index 87c29f0..8c13798 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -14,15 +14,8 @@ MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -_ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages", "retries"]) - - -class ProduceRequest(_ProduceRequest): - def __new__(cls, topic, partition, messages, retries=0): - return super(ProduceRequest, cls).__new__( - cls, topic, partition, messages, retries) - +ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages"]) ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -79,7 +72,7 @@ KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) # Define retry policy for async producer -# Limit corner values: None - infinite retries, 0 - no retries +# Limit value: int >= 0, 0 means no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) @@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError): class AsyncProducerQueueFull(KafkaError): - pass + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs def _iter_broker_errors(): |