summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-09 22:28:54 -0700
committerDana Powers <dana.powers@rd.io>2015-06-09 22:28:54 -0700
commit8b0a598045c010756e79d059736de06423b728c5 (patch)
tree5181eab8b293261852e51347dd163bfaf656bcaa
parent7b2f98f5ec2fa606b8c29e902d84a8e8dc486681 (diff)
downloadkafka-python-8b0a598045c010756e79d059736de06423b728c5.tar.gz
Add send/receive debug logging to async producer
-rw-r--r--kafka/producer/base.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 824ef5d..49090bd 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -101,7 +101,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
count = 0
log.debug('Skipping new batch collection to handle retries')
else:
- log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
+ log.debug('Batching size: %s, timeout: %s', count, timeout)
# Keep fetching till we gather enough messages or a
# timeout is reached
@@ -147,12 +147,14 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
retry_state['do_refresh'] |= True
requests = list(request_tries.keys())
- reply = client.send_produce_request(requests,
- acks=req_acks,
- timeout=ack_timeout,
- fail_on_error=False)
-
- for i, response in enumerate(reply):
+ log.debug('Sending: %s', requests)
+ responses = client.send_produce_request(requests,
+ acks=req_acks,
+ timeout=ack_timeout,
+ fail_on_error=False)
+
+ log.debug('Received: %s', responses)
+ for i, response in enumerate(responses):
error_cls = None
if isinstance(response, FailedPayloadsError):
error_cls = response.__class__
@@ -164,7 +166,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_cls:
_handle_error(error_cls, orig_req)
- log.error('Error sending ProduceRequest to %s:%d with msgs %s',
+ log.error('Error sending ProduceRequest (#%d of %d) to %s:%d '
+ 'with msgs %s', i + 1, len(requests),
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))