diff options
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r-- | kafka/producer/base.py | 57 |
1 files changed, 33 insertions, 24 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 03ef2a7..602e2ed 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -34,8 +34,10 @@ BATCH_SEND_MSG_COUNT = 20 ASYNC_QUEUE_MAXSIZE = 0 ASYNC_QUEUE_PUT_TIMEOUT = 0 # no retries by default -ASYNC_RETRY_OPTIONS = RetryOptions( - limit=0, backoff_ms=0, retry_on_timeouts=False) +ASYNC_RETRY_LIMIT = 0 +ASYNC_RETRY_BACKOFF_MS = 0 +ASYNC_RETRY_ON_TIMEOUTS = False + STOP_ASYNC_PRODUCER = -1 @@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, a specified timeout and send them upstream to the brokers in one request """ - reqs = [] + reqs = {} client.reinit() while not stop_event.is_set(): @@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, - messages) - reqs.append(req) + tuple(messages)) + reqs[req] = 0 if not reqs: continue reqs_to_retry, error_type = [], None - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except FailedPayloadsError as ex: - error_type = FailedPayloadsError - reqs_to_retry = ex.failed_payloads + try: + reply = client.send_produce_request(reqs.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + reqs_to_retry = [req for broker_responses in reply + for response in broker_responses + for req in response.failed_payloads + if isinstance(response, FailedPayloadsError)] + if reqs_to_retry: + error_type = FailedPayloadsError except RequestTimedOutError: error_type = RequestTimedOutError if retry_options.retry_on_timeouts: - reqs_to_retry = reqs + reqs_to_retry = reqs.keys() except Exception as ex: error_type = type(ex) if type(ex) in RETRY_ERROR_TYPES: - reqs_to_retry = reqs - - finally: - reqs = [] + reqs_to_retry = reqs.keys() - if not reqs_to_retry or retry_options.limit == 0: + if not reqs_to_retry: + reqs = {} continue # doing backoff before next retry @@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_type in RETRY_REFRESH_ERROR_TYPES: client.load_metadata_for_topics() - reqs = [req._replace(retries=req.retries+1) - for req in reqs_to_retry - if not retry_options.limit or - (retry_options.limit and req.retries < retry_options.limit)] + reqs = {key: count + 1 for key, count in reqs.items() + if key in reqs_to_retry and count < retry_options.limit} class Producer(object): @@ -161,7 +163,9 @@ class Producer(object): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): @@ -191,6 +195,10 @@ class Producer(object): # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout + async_retry_options = RetryOptions( + limit=async_retry_limit, + backoff_ms=async_retry_backoff_ms, + retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -252,7 +260,7 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: - for m in msg: + for idx, m in enumerate(msg): try: item = (TopicAndPartition(topic, partition), m, key) if self.async_queue_put_timeout == 0: @@ -261,6 +269,7 @@ class Producer(object): self.queue.put(item, True, self.async_queue_put_timeout) except Full: raise AsyncProducerQueueFull( + msg[idx:], 'Producer async queue overfilled. ' 'Current queue size %d.' % self.queue.qsize()) resp = [] |