summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-05-15 12:58:34 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:49 +0300
commita3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch)
tree0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /kafka/common.py
parent4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff)
downloadkafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py17
1 files changed, 6 insertions, 11 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 87c29f0..8c13798 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -14,15 +14,8 @@ MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
-_ProduceRequest = namedtuple("ProduceRequest",
- ["topic", "partition", "messages", "retries"])
-
-
-class ProduceRequest(_ProduceRequest):
- def __new__(cls, topic, partition, messages, retries=0):
- return super(ProduceRequest, cls).__new__(
- cls, topic, partition, messages, retries)
-
+ProduceRequest = namedtuple("ProduceRequest",
+ ["topic", "partition", "messages"])
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
@@ -79,7 +72,7 @@ KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
# Define retry policy for async producer
-# Limit corner values: None - infinite retries, 0 - no retries
+# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
@@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError):
class AsyncProducerQueueFull(KafkaError):
- pass
+ def __init__(self, failed_msgs, *args):
+ super(AsyncProducerQueueFull, self).__init__(*args)
+ self.failed_msgs = failed_msgs
def _iter_broker_errors():