summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py31
1 files changed, 24 insertions, 7 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 4bd3de4..a5af3d6 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -24,22 +24,26 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
+BATCH_RETRY_BACKOFF_MS = 300
+BATCH_RETRIES_LIMIT = 5
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, stop_event):
+ req_acks, ack_timeout, retry_backoff, retries_limit, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
stop = False
+ reqs = []
+ client.reinit()
while not stop_event.is_set():
timeout = batch_time
- count = batch_size
+ count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)
@@ -48,7 +52,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)
-
except Empty:
break
@@ -63,7 +66,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
msgset[topic_partition].append((msg, key))
# Send collected requests upstream
- reqs = []
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
@@ -75,8 +77,19 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
- except Exception:
- log.exception("Unable to send message")
+ except FailedPayloadsError as ex:
+ log.exception("Failed payloads count %s" % len(ex.message))
+ if retries_limit is None:
+ reqs = ex.message
+ continue
+ for req in ex.message:
+ if retries_limit and req.retries < retries_limit:
+ reqs.append(req._replace(retries=req.retries+1))
+ except Exception as ex:
+ log.exception("Unable to send message: %s" % type(ex))
+
+ if reqs and retry_backoff:
+ time.sleep(float(retry_backoff) / 1000)
class Producer(object):
@@ -111,7 +124,9 @@ class Producer(object):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
+ batch_retries_limit=BATCH_RETRIES_LIMIT):
if batch_send:
async = True
@@ -148,6 +163,8 @@ class Producer(object):
batch_send_every_n,
self.req_acks,
self.ack_timeout,
+ batch_retry_backoff_ms,
+ batch_retries_limit,
self.thread_stop_event))
# Thread will die if main thread exits