summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/conn.py6
-rw-r--r--test/test_conn.py18
-rw-r--r--test/testutil.py3
4 files changed, 4 insertions, 25 deletions
diff --git a/kafka/client.py b/kafka/client.py
index ca737c4..e66190d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -144,7 +144,6 @@ class KafkaClient(object):
response = conn.recv()
if response is not None:
decoded = decoder_fn(response)
- log.debug('Response %s: %s', correlation_id, decoded)
return decoded
raise KafkaUnavailableError('All servers failed to process request')
@@ -250,7 +249,6 @@ class KafkaClient(object):
'from server %s', correlation_id, broker)
continue
- log.debug('Response %s: %s', correlation_id, response)
for payload_response in decoder_fn(response):
topic_partition = (str(payload_response.topic),
payload_response.partition)
diff --git a/kafka/conn.py b/kafka/conn.py
index 9907cb1..bd399a9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -75,11 +75,12 @@ class BrokerConnection(local):
self._write_fd.write(message)
self._write_fd.flush()
except socket.error:
- log.exception("Error in BrokerConnection.send()")
+ log.exception("Error in BrokerConnection.send(): %s", request)
self.close()
return None
if expect_response:
self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE))
+ log.debug('Request %d: %s', self.correlation_id, request)
return self.correlation_id
def recv(self, timeout=None):
@@ -100,9 +101,10 @@ class BrokerConnection(local):
raise RuntimeError('Correlation ids do not match!')
response = response_type.decode(self._read_fd)
except (RuntimeError, socket.error, struct.error):
- log.exception("Error in BrokerConnection.recv()")
+ log.exception("Error in BrokerConnection.recv() for request %d", correlation_id)
self.close()
return None
+ log.debug('Response %d: %s', correlation_id, response)
return response
def next_correlation_id_recv(self):
diff --git a/test/test_conn.py b/test/test_conn.py
index 1bdfc1e..684ffe5 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -1,4 +1,3 @@
-import logging
import socket
import struct
from threading import Thread
@@ -12,9 +11,6 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE
class ConnTest(unittest.TestCase):
def setUp(self):
- # kafka.conn debug logging is verbose, so only enable in conn tests
- logging.getLogger('kafka.conn').setLevel(logging.DEBUG)
-
self.config = {
'host': 'localhost',
'port': 9090,
@@ -50,11 +46,6 @@ class ConnTest(unittest.TestCase):
# Reset any mock counts caused by __init__
self.MockCreateConn.reset_mock()
- def tearDown(self):
- # Return connection logging to INFO
- logging.getLogger('kafka.conn').setLevel(logging.INFO)
-
-
def test_collect_hosts__happy_path(self):
hosts = "localhost:1234,localhost"
results = collect_hosts(hosts)
@@ -193,15 +184,6 @@ class ConnTest(unittest.TestCase):
class TestKafkaConnection(unittest.TestCase):
-
- def setUp(self):
- # kafka.conn debug logging is verbose, so only enable in conn tests
- logging.getLogger('kafka.conn').setLevel(logging.DEBUG)
-
- def tearDown(self):
- # Return connection logging to INFO
- logging.getLogger('kafka.conn').setLevel(logging.INFO)
-
@mock.patch('socket.create_connection')
def test_copy(self, socket):
"""KafkaConnection copies work as expected"""
diff --git a/test/testutil.py b/test/testutil.py
index 5c6ea1b..98fe805 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -112,6 +112,3 @@ class Timer(object):
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('test.fixtures').setLevel(logging.ERROR)
logging.getLogger('test.service').setLevel(logging.ERROR)
-
-# kafka.conn debug logging is verbose, disable in tests by default
-logging.getLogger('kafka.conn').setLevel(logging.INFO)