summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py113
1 files changed, 74 insertions, 39 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index c2b8fb0..a05ce8e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -26,9 +26,9 @@ DEFAULT_KAFKA_PORT = 9092
class ConnectionStates(object):
- DISCONNECTED = 1
- CONNECTING = 2
- CONNECTED = 3
+ DISCONNECTED = '<disconnected>'
+ CONNECTING = '<connecting>'
+ CONNECTED = '<connected>'
InFlightRequest = collections.namedtuple('InFlightRequest',
@@ -37,10 +37,12 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
class BrokerConnection(object):
_receive_buffer_bytes = 32768
- _send_buffer_bytes = 32768
+ _send_buffer_bytes = 131072
_client_id = 'kafka-python-0.10.0'
_correlation_id = 0
_request_timeout_ms = 40000
+ _max_in_flight_requests_per_connection = 5
+ _reconnect_backoff_ms = 50
def __init__(self, host, port, **kwargs):
self.host = host
@@ -48,7 +50,9 @@ class BrokerConnection(object):
self.in_flight_requests = collections.deque()
for config in ('receive_buffer_bytes', 'send_buffer_bytes',
- 'client_id', 'correlation_id', 'request_timeout_ms'):
+ 'client_id', 'correlation_id', 'request_timeout_ms',
+ 'max_in_flight_requests_per_connection',
+ 'reconnect_backoff_ms'):
if config in kwargs:
setattr(self, '_' + config, kwargs.pop(config))
@@ -57,8 +61,9 @@ class BrokerConnection(object):
self._rbuffer = io.BytesIO()
self._receiving = False
self._next_payload_bytes = 0
- self._last_connection_attempt = None
- self._last_connection_failure = None
+ self.last_attempt = 0
+ self.last_failure = 0
+ self._processing = False
def connect(self):
"""Attempt to connect and return ConnectionState"""
@@ -69,34 +74,47 @@ class BrokerConnection(object):
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes)
self._sock.setblocking(False)
ret = self._sock.connect_ex((self.host, self.port))
- self._last_connection_attempt = time.time()
+ self.last_attempt = time.time()
if not ret or ret is errno.EISCONN:
self.state = ConnectionStates.CONNECTED
elif ret in (errno.EINPROGRESS, errno.EALREADY):
self.state = ConnectionStates.CONNECTING
else:
- log.error('Connect attempt returned error %s. Disconnecting.', ret)
+ log.error('Connect attempt to %s returned error %s.'
+ ' Disconnecting.', self, ret)
self.close()
- self._last_connection_failure = time.time()
+ self.last_failure = time.time()
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
- if time.time() > (self._request_timeout_ms / 1000.0) + self._last_connection_attempt:
- log.error('Connection attempt timed out')
+ if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt:
+ log.error('Connection attempt to %s timed out', self)
self.close() # error=TimeoutError ?
- self._last_connection_failure = time.time()
+ self.last_failure = time.time()
ret = self._sock.connect_ex((self.host, self.port))
if not ret or ret is errno.EISCONN:
self.state = ConnectionStates.CONNECTED
elif ret is not errno.EALREADY:
- log.error('Connect attempt returned error %s. Disconnecting.', ret)
+ log.error('Connect attempt to %s returned error %s.'
+ ' Disconnecting.', self, ret)
self.close()
- self._last_connection_failure = time.time()
+ self.last_failure = time.time()
return self.state
+ def blacked_out(self):
+ """
+ Return true if we are disconnected from the given node and can't
+ re-establish a connection yet
+ """
+ if self.state is ConnectionStates.DISCONNECTED:
+ now = time.time()
+ if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0:
+ return True
+ return False
+
def connected(self):
return self.state is ConnectionStates.CONNECTED
@@ -105,17 +123,15 @@ class BrokerConnection(object):
self._sock.close()
self._sock = None
self.state = ConnectionStates.DISCONNECTED
-
+ self._receiving = False
+ self._next_payload_bytes = 0
+ self._rbuffer.seek(0)
+ self._rbuffer.truncate()
if error is None:
error = Errors.DisconnectError()
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
- self.in_flight_requests.clear()
- self._receiving = False
- self._next_payload_bytes = 0
- self._rbuffer.seek(0)
- self._rbuffer.truncate()
def send(self, request, expect_response=True):
"""send request, return Future()
@@ -125,6 +141,8 @@ class BrokerConnection(object):
future = Future()
if not self.connected():
return future.failure(Errors.DisconnectError())
+ if not self.can_send_more():
+ return future.failure(Errors.TooManyInFlightRequests())
self._correlation_id += 1
header = RequestHeader(request,
correlation_id=self._correlation_id,
@@ -142,10 +160,10 @@ class BrokerConnection(object):
assert sent_bytes == len(message)
self._sock.setblocking(False)
except (AssertionError, socket.error) as e:
- log.debug("Error in BrokerConnection.send(): %s", request)
+ log.exception("Error sending %s to %s", request, self)
self.close(error=e)
return future.failure(e)
- log.debug('Request %d: %s', self._correlation_id, request)
+ log.debug('%s Request %d: %s', self, self._correlation_id, request)
if expect_response:
ifr = InFlightRequest(request=request,
@@ -159,24 +177,35 @@ class BrokerConnection(object):
return future
+ def can_send_more(self):
+ return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection
+
def recv(self, timeout=0):
"""Non-blocking network receive
Return response if available
"""
+ if self._processing:
+ raise Errors.IllegalStateError('Recursive connection processing'
+ ' not supported')
if not self.connected():
- log.warning('Cannot recv: socket not connected')
+ log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
self.close()
return None
- if not self.in_flight_requests:
- log.warning('No in-flight-requests to recv')
+ elif not self.in_flight_requests:
+ log.warning('%s: No in-flight-requests to recv', self)
return None
- self._fail_timed_out_requests()
+ elif self._requests_timed_out():
+ log.warning('%s timed out after %s ms. Closing connection.',
+ self, self._request_timeout_ms)
+ self.close(error=Errors.RequestTimedOutError(
+ 'Request timed out after %s ms' % self._request_timeout_ms))
+ return None
readable, _, _ = select([self._sock], [], [], timeout)
if not readable:
@@ -193,7 +222,8 @@ class BrokerConnection(object):
# This shouldn't happen after selecting above
# but just in case
return None
- log.exception("Error receiving 4-byte payload header - closing socket")
+ log.exception('%s: Error receiving 4-byte payload header -'
+ ' closing socket', self)
self.close(error=e)
return None
@@ -216,7 +246,7 @@ class BrokerConnection(object):
# header, but nothing to read in the body yet
if e.errno == errno.EWOULDBLOCK:
return None
- log.exception()
+ log.exception('%s: Error in recv', self)
self.close(error=e)
return None
@@ -236,6 +266,11 @@ class BrokerConnection(object):
return response
def _process_response(self, read_buffer):
+ if self._processing:
+ raise Errors.IllegalStateError('Recursive connection processing'
+ ' not supported')
+ else:
+ self._processing = True
ifr = self.in_flight_requests.popleft()
# verify send/recv correlation ids match
@@ -246,23 +281,23 @@ class BrokerConnection(object):
% (ifr.correlation_id, recv_correlation_id))
ifr.future.fail(error)
self.close()
+ self._processing = False
return None
# decode response
response = ifr.response_type.decode(read_buffer)
+ log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
ifr.future.success(response)
- log.debug('Response %d: %s', ifr.correlation_id, response)
+ self._processing = False
return response
- def _fail_timed_out_requests(self):
- now = time.time()
- while self.in_flight_requests:
- next_timeout = self.in_flight_requests[0].timestamp + (self._request_timeout_ms / 1000.0)
- if now < next_timeout:
- break
- timed_out = self.in_flight_requests.popleft()
- error = Errors.RequestTimedOutError('Request timed out after %s ms' % self._request_timeout_ms)
- timed_out.future.failure(error)
+ def _requests_timed_out(self):
+ if self.in_flight_requests:
+ oldest_at = self.in_flight_requests[0].timestamp
+ timeout = self._request_timeout_ms / 1000.0
+ if time.time() >= oldest_at + timeout:
+ return True
+ return False
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)