summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py76
1 files changed, 65 insertions, 11 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 0602d70..fee44c4 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,6 +1,8 @@
+from collections import deque
import copy
import logging
from random import shuffle
+from select import select
import socket
import struct
from threading import local
@@ -24,26 +26,78 @@ class BrokerConnection(local):
self.host = host
self.port = port
self.timeout = timeout
- self._sock = socket.create_connection((host, port), timeout)
- self.fd = self._sock.makefile(mode='+')
+ self._write_fd = None
+ self._read_fd = None
self.correlation_id = 0
+ self.in_flight_requests = deque()
+
+ def connect(self):
+ if self.connected():
+ self.close()
+ try:
+ sock = socket.create_connection((self.host, self.port), self.timeout)
+ self._write_fd = sock.makefile('wb')
+ self._read_fd = sock.makefile('rb')
+ except socket.error as e:
+ log.exception("Error in BrokerConnection.connect()")
+ return None
+ self.in_flight_requests.clear()
+ return True
+
+ def connected(self):
+ return (self._read_fd is not None and self._write_fd is not None)
def close(self):
- self.fd.close()
- self._sock.close()
+ if self.connected():
+ try:
+ self._read_fd.close()
+ self._write_fd.close()
+ except socket.error as e:
+ log.exception("Error in BrokerConnection.close()")
+ pass
+ self._read_fd = None
+ self._write_fd = None
+ self.in_flight_requests.clear()
def send(self, request):
+ if not self.connected() and not self.connect():
+ return None
self.correlation_id += 1
header = RequestHeader(request, correlation_id=self.correlation_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
- self.fd.write(size)
- self.fd.write(message)
- self.fd.flush()
-
- size = Int32.decode(self.fd)
- correlation_id = Int32.decode(self.fd)
- return request.RESPONSE_TYPE.decode(self.fd)
+ try:
+ self._write_fd.write(size)
+ self._write_fd.write(message)
+ self._write_fd.flush()
+ except socket.error as e:
+ log.exception("Error in BrokerConnection.send()")
+ self.close()
+ return None
+ self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE))
+ return self.correlation_id
+
+ def recv(self, timeout=None):
+ if not self.connected():
+ return None
+ readable, _, _ = select([self._read_fd], [], [], timeout)
+ if not readable:
+ return None
+ correlation_id, response_type = self.in_flight_requests.popleft()
+ # Current implementation does not use size
+ # instead we read directly from the socket fd buffer
+ # alternatively, we could read size bytes into a separate buffer
+ # and decode from that buffer (and verify buffer is empty afterwards)
+ size = Int32.decode(self._read_fd)
+ recv_correlation_id = Int32.decode(self._read_fd)
+ assert correlation_id == recv_correlation_id
+ try:
+ response = response_type.decode(self._read_fd)
+ except socket.error as e:
+ log.exception("Error in BrokerConnection.recv()")
+ self.close()
+ return None
+ return response
def __getnewargs__(self):
return (self.host, self.port, self.timeout)