diff options
-rw-r--r-- | kafka/conn.py | 18 |
1 files changed, 14 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 33c799c..0d17cb8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -112,7 +112,11 @@ class KafkaConnection(local): # TODO multiplex socket communication to allow for multi-threaded clients def send(self, request_id, payload): - "Send a request to Kafka" + """ + Send a request to Kafka + param: request_id -- can be any int (used only for debug logging...) + param: payload -- an encoded kafka packet (see KafkaProtocol) + """ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) @@ -128,12 +132,14 @@ class KafkaConnection(local): def recv(self, request_id): """ - Get a response from Kafka + Get a response packet from Kafka + param: request_id -- can be any int (only used for debug logging...) + returns encoded kafka packet response from server as type str """ log.debug("Reading response %d from Kafka" % request_id) + # Read the size off of the header resp = self._read_bytes(4) - (size,) = struct.unpack('>i', resp) # Read the remainder of the response @@ -144,6 +150,7 @@ class KafkaConnection(local): """ Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again + return a new KafkaConnection object """ c = copy.deepcopy(self) c._sock = None @@ -151,7 +158,7 @@ class KafkaConnection(local): def close(self): """ - Close this connection + Shutdown and close the connection socket """ log.debug("Closing socket connection for %s:%d" % (self.host, self.port)) if self._sock: @@ -172,6 +179,9 @@ class KafkaConnection(local): def reinit(self): """ Re-initialize the socket connection + close current socket (if open) + and start a fresh connection + raise ConnectionError on error """ log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) |