diff options
author | David Arthur <mumrah@gmail.com> | 2013-10-04 04:54:12 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-10-04 04:54:12 -0700 |
commit | cfd9f86e60429d1f7af8bcac5849808354b8719e (patch) | |
tree | 82039e80d595b4ad611a831a597521cae7939571 /kafka/conn.py | |
parent | b0cacc948539d180e4a634a06a10232770deb187 (diff) | |
parent | 59af614d1d09db6f7e0115dcf39232bf4f0ece9a (diff) | |
download | kafka-python-cfd9f86e60429d1f7af8bcac5849808354b8719e.tar.gz |
Merge pull request #59 from mrtheb/master
flake8 (pep8 and pyflakes) clean-up
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 29efbf1..e85fd11 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -3,6 +3,8 @@ import socket import struct from threading import local +from kafka.common import BufferUnderflowError + log = logging.getLogger("kafka") @@ -12,7 +14,7 @@ class KafkaConnection(local): A socket connection to a single Kafka broker This class is _not_ thread safe. Each call to `send` must be followed - by a call to `recv` in order to get the correct response. Eventually, + by a call to `recv` in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ @@ -43,7 +45,7 @@ class KafkaConnection(local): def _consume_response_iter(self): """ - This method handles the response header and error messages. It + This method handles the response header and error messages. It then returns an iterator for the chunks of the response """ log.debug("Handling response from Kafka") @@ -57,13 +59,15 @@ class KafkaConnection(local): messagesize = size - 4 log.debug("About to read %d bytes from Kafka", messagesize) - # Read the remainder of the response + # Read the remainder of the response total = 0 while total < messagesize: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": - raise BufferUnderflowError("Not enough data to read this response") + raise BufferUnderflowError( + "Not enough data to read this response") + total += len(resp) yield resp @@ -75,9 +79,13 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + + log.debug( + "About to send %d bytes to Kafka, request %d" % + (len(payload), request_id)) + sent = self._sock.sendall(payload) - if sent != None: + if sent is not None: raise RuntimeError("Kafka went away") def recv(self, request_id): |