summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
committermrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
commita03f0c86b8a504c0e3185cac1611131dba24f625 (patch)
tree3797524d3411640968292c6eba0141fc4c1f3457 /kafka/conn.py
parentb0cacc948539d180e4a634a06a10232770deb187 (diff)
downloadkafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py20
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 29efbf1..e85fd11 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -3,6 +3,8 @@ import socket
import struct
from threading import local
+from kafka.common import BufferUnderflowError
+
log = logging.getLogger("kafka")
@@ -12,7 +14,7 @@ class KafkaConnection(local):
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to `send` must be followed
- by a call to `recv` in order to get the correct response. Eventually,
+ by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
@@ -43,7 +45,7 @@ class KafkaConnection(local):
def _consume_response_iter(self):
"""
- This method handles the response header and error messages. It
+ This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")
@@ -57,13 +59,15 @@ class KafkaConnection(local):
messagesize = size - 4
log.debug("About to read %d bytes from Kafka", messagesize)
- # Read the remainder of the response
+ # Read the remainder of the response
total = 0
while total < messagesize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
- raise BufferUnderflowError("Not enough data to read this response")
+ raise BufferUnderflowError(
+ "Not enough data to read this response")
+
total += len(resp)
yield resp
@@ -75,9 +79,13 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"Send a request to Kafka"
- log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
+
+ log.debug(
+ "About to send %d bytes to Kafka, request %d" %
+ (len(payload), request_id))
+
sent = self._sock.sendall(payload)
- if sent != None:
+ if sent is not None:
raise RuntimeError("Kafka went away")
def recv(self, request_id):