diff options
Diffstat (limited to 'kafka/producer/base.py')
| -rw-r--r-- | kafka/producer/base.py | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de4..a5af3d6 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -24,22 +24,26 @@ log = logging.getLogger("kafka") BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +BATCH_RETRY_BACKOFF_MS = 300 +BATCH_RETRIES_LIMIT = 5 STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): + req_acks, ack_timeout, retry_backoff, retries_limit, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request """ stop = False + reqs = [] + client.reinit() while not stop_event.is_set(): timeout = batch_time - count = batch_size + count = batch_size - len(reqs) send_at = time.time() + timeout msgset = defaultdict(list) @@ -48,7 +52,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while count > 0 and timeout >= 0: try: topic_partition, msg, key = queue.get(timeout=timeout) - except Empty: break @@ -63,7 +66,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, msgset[topic_partition].append((msg, key)) # Send collected requests upstream - reqs = [] for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, @@ -75,8 +77,19 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + except FailedPayloadsError as ex: + log.exception("Failed payloads count %s" % len(ex.message)) + if retries_limit is None: + reqs = ex.message + continue + for req in ex.message: + if retries_limit and req.retries < retries_limit: + reqs.append(req._replace(retries=req.retries+1)) + except Exception as ex: + log.exception("Unable to send message: %s" % type(ex)) + + if reqs and retry_backoff: + time.sleep(float(retry_backoff) / 1000) class Producer(object): @@ -111,7 +124,9 @@ class Producer(object): codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): if batch_send: async = True @@ -148,6 +163,8 @@ class Producer(object): batch_send_every_n, self.req_acks, self.ack_timeout, + batch_retry_backoff_ms, + batch_retries_limit, self.thread_stop_event)) # Thread will die if main thread exits |
