diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-09 22:28:54 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-09 22:28:54 -0700 |
commit | 8b0a598045c010756e79d059736de06423b728c5 (patch) | |
tree | 5181eab8b293261852e51347dd163bfaf656bcaa | |
parent | 7b2f98f5ec2fa606b8c29e902d84a8e8dc486681 (diff) | |
download | kafka-python-8b0a598045c010756e79d059736de06423b728c5.tar.gz |
Add send/receive debug logging to async producer
-rw-r--r-- | kafka/producer/base.py | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 824ef5d..49090bd 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -101,7 +101,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, count = 0 log.debug('Skipping new batch collection to handle retries') else: - log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout)) + log.debug('Batching size: %s, timeout: %s', count, timeout) # Keep fetching till we gather enough messages or a # timeout is reached @@ -147,12 +147,14 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, retry_state['do_refresh'] |= True requests = list(request_tries.keys()) - reply = client.send_produce_request(requests, - acks=req_acks, - timeout=ack_timeout, - fail_on_error=False) - - for i, response in enumerate(reply): + log.debug('Sending: %s', requests) + responses = client.send_produce_request(requests, + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + + log.debug('Received: %s', responses) + for i, response in enumerate(responses): error_cls = None if isinstance(response, FailedPayloadsError): error_cls = response.__class__ @@ -164,7 +166,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_cls: _handle_error(error_cls, orig_req) - log.error('Error sending ProduceRequest to %s:%d with msgs %s', + log.error('Error sending ProduceRequest (#%d of %d) to %s:%d ' + 'with msgs %s', i + 1, len(requests), orig_req.topic, orig_req.partition, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) |