diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-10 18:38:34 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-10 18:38:34 -0800 |
commit | 8fff81468df640c0c1fc5daeb8fd8dd980c15c0c (patch) | |
tree | 8569b0b792a4b9e767563b6525c1ca0a72176397 | |
parent | 1cb732aba10e82232197e19fd84a79cbab6214c6 (diff) | |
download | kafka-python-8fff81468df640c0c1fc5daeb8fd8dd980c15c0c.tar.gz |
Move Request / Response logging from KafkaClient to BrokerConnection
and reenable kafka.conn debug logging in tests
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 6 | ||||
-rw-r--r-- | test/test_conn.py | 18 | ||||
-rw-r--r-- | test/testutil.py | 3 |
4 files changed, 4 insertions, 25 deletions
diff --git a/kafka/client.py b/kafka/client.py index ca737c4..e66190d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -144,7 +144,6 @@ class KafkaClient(object): response = conn.recv() if response is not None: decoded = decoder_fn(response) - log.debug('Response %s: %s', correlation_id, decoded) return decoded raise KafkaUnavailableError('All servers failed to process request') @@ -250,7 +249,6 @@ class KafkaClient(object): 'from server %s', correlation_id, broker) continue - log.debug('Response %s: %s', correlation_id, response) for payload_response in decoder_fn(response): topic_partition = (str(payload_response.topic), payload_response.partition) diff --git a/kafka/conn.py b/kafka/conn.py index 9907cb1..bd399a9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -75,11 +75,12 @@ class BrokerConnection(local): self._write_fd.write(message) self._write_fd.flush() except socket.error: - log.exception("Error in BrokerConnection.send()") + log.exception("Error in BrokerConnection.send(): %s", request) self.close() return None if expect_response: self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE)) + log.debug('Request %d: %s', self.correlation_id, request) return self.correlation_id def recv(self, timeout=None): @@ -100,9 +101,10 @@ class BrokerConnection(local): raise RuntimeError('Correlation ids do not match!') response = response_type.decode(self._read_fd) except (RuntimeError, socket.error, struct.error): - log.exception("Error in BrokerConnection.recv()") + log.exception("Error in BrokerConnection.recv() for request %d", correlation_id) self.close() return None + log.debug('Response %d: %s', correlation_id, response) return response def next_correlation_id_recv(self): diff --git a/test/test_conn.py b/test/test_conn.py index 1bdfc1e..684ffe5 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,4 +1,3 @@ -import logging import socket import struct from threading import Thread @@ -12,9 +11,6 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE class ConnTest(unittest.TestCase): def setUp(self): - # kafka.conn debug logging is verbose, so only enable in conn tests - logging.getLogger('kafka.conn').setLevel(logging.DEBUG) - self.config = { 'host': 'localhost', 'port': 9090, @@ -50,11 +46,6 @@ class ConnTest(unittest.TestCase): # Reset any mock counts caused by __init__ self.MockCreateConn.reset_mock() - def tearDown(self): - # Return connection logging to INFO - logging.getLogger('kafka.conn').setLevel(logging.INFO) - - def test_collect_hosts__happy_path(self): hosts = "localhost:1234,localhost" results = collect_hosts(hosts) @@ -193,15 +184,6 @@ class ConnTest(unittest.TestCase): class TestKafkaConnection(unittest.TestCase): - - def setUp(self): - # kafka.conn debug logging is verbose, so only enable in conn tests - logging.getLogger('kafka.conn').setLevel(logging.DEBUG) - - def tearDown(self): - # Return connection logging to INFO - logging.getLogger('kafka.conn').setLevel(logging.INFO) - @mock.patch('socket.create_connection') def test_copy(self, socket): """KafkaConnection copies work as expected""" diff --git a/test/testutil.py b/test/testutil.py index 5c6ea1b..98fe805 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -112,6 +112,3 @@ class Timer(object): logging.basicConfig(level=logging.DEBUG) logging.getLogger('test.fixtures').setLevel(logging.ERROR) logging.getLogger('test.service').setLevel(logging.ERROR) - -# kafka.conn debug logging is verbose, disable in tests by default -logging.getLogger('kafka.conn').setLevel(logging.INFO) |