diff options
author | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
commit | a03f0c86b8a504c0e3185cac1611131dba24f625 (patch) | |
tree | 3797524d3411640968292c6eba0141fc4c1f3457 /kafka/conn.py | |
parent | b0cacc948539d180e4a634a06a10232770deb187 (diff) | |
download | kafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz |
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 29efbf1..e85fd11 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -3,6 +3,8 @@ import socket import struct from threading import local +from kafka.common import BufferUnderflowError + log = logging.getLogger("kafka") @@ -12,7 +14,7 @@ class KafkaConnection(local): A socket connection to a single Kafka broker This class is _not_ thread safe. Each call to `send` must be followed - by a call to `recv` in order to get the correct response. Eventually, + by a call to `recv` in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ @@ -43,7 +45,7 @@ class KafkaConnection(local): def _consume_response_iter(self): """ - This method handles the response header and error messages. It + This method handles the response header and error messages. It then returns an iterator for the chunks of the response """ log.debug("Handling response from Kafka") @@ -57,13 +59,15 @@ class KafkaConnection(local): messagesize = size - 4 log.debug("About to read %d bytes from Kafka", messagesize) - # Read the remainder of the response + # Read the remainder of the response total = 0 while total < messagesize: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": - raise BufferUnderflowError("Not enough data to read this response") + raise BufferUnderflowError( + "Not enough data to read this response") + total += len(resp) yield resp @@ -75,9 +79,13 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + + log.debug( + "About to send %d bytes to Kafka, request %d" % + (len(payload), request_id)) + sent = self._sock.sendall(payload) - if sent != None: + if sent is not None: raise RuntimeError("Kafka went away") def recv(self, request_id): |