diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-10 18:38:34 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-10 18:38:34 -0800 |
commit | 8fff81468df640c0c1fc5daeb8fd8dd980c15c0c (patch) | |
tree | 8569b0b792a4b9e767563b6525c1ca0a72176397 /kafka | |
parent | 1cb732aba10e82232197e19fd84a79cbab6214c6 (diff) | |
download | kafka-python-8fff81468df640c0c1fc5daeb8fd8dd980c15c0c.tar.gz |
Move Request / Response logging from KafkaClient to BrokerConnection
and reenable kafka.conn debug logging in tests
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 6 |
2 files changed, 4 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py index ca737c4..e66190d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -144,7 +144,6 @@ class KafkaClient(object): response = conn.recv() if response is not None: decoded = decoder_fn(response) - log.debug('Response %s: %s', correlation_id, decoded) return decoded raise KafkaUnavailableError('All servers failed to process request') @@ -250,7 +249,6 @@ class KafkaClient(object): 'from server %s', correlation_id, broker) continue - log.debug('Response %s: %s', correlation_id, response) for payload_response in decoder_fn(response): topic_partition = (str(payload_response.topic), payload_response.partition) diff --git a/kafka/conn.py b/kafka/conn.py index 9907cb1..bd399a9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -75,11 +75,12 @@ class BrokerConnection(local): self._write_fd.write(message) self._write_fd.flush() except socket.error: - log.exception("Error in BrokerConnection.send()") + log.exception("Error in BrokerConnection.send(): %s", request) self.close() return None if expect_response: self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE)) + log.debug('Request %d: %s', self.correlation_id, request) return self.correlation_id def recv(self, timeout=None): @@ -100,9 +101,10 @@ class BrokerConnection(local): raise RuntimeError('Correlation ids do not match!') response = response_type.decode(self._read_fd) except (RuntimeError, socket.error, struct.error): - log.exception("Error in BrokerConnection.recv()") + log.exception("Error in BrokerConnection.recv() for request %d", correlation_id) self.close() return None + log.debug('Response %d: %s', correlation_id, response) return response def next_correlation_id_recv(self): |