diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 113 |
1 files changed, 74 insertions, 39 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index c2b8fb0..a05ce8e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -26,9 +26,9 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): - DISCONNECTED = 1 - CONNECTING = 2 - CONNECTED = 3 + DISCONNECTED = '<disconnected>' + CONNECTING = '<connecting>' + CONNECTED = '<connected>' InFlightRequest = collections.namedtuple('InFlightRequest', @@ -37,10 +37,12 @@ InFlightRequest = collections.namedtuple('InFlightRequest', class BrokerConnection(object): _receive_buffer_bytes = 32768 - _send_buffer_bytes = 32768 + _send_buffer_bytes = 131072 _client_id = 'kafka-python-0.10.0' _correlation_id = 0 _request_timeout_ms = 40000 + _max_in_flight_requests_per_connection = 5 + _reconnect_backoff_ms = 50 def __init__(self, host, port, **kwargs): self.host = host @@ -48,7 +50,9 @@ class BrokerConnection(object): self.in_flight_requests = collections.deque() for config in ('receive_buffer_bytes', 'send_buffer_bytes', - 'client_id', 'correlation_id', 'request_timeout_ms'): + 'client_id', 'correlation_id', 'request_timeout_ms', + 'max_in_flight_requests_per_connection', + 'reconnect_backoff_ms'): if config in kwargs: setattr(self, '_' + config, kwargs.pop(config)) @@ -57,8 +61,9 @@ class BrokerConnection(object): self._rbuffer = io.BytesIO() self._receiving = False self._next_payload_bytes = 0 - self._last_connection_attempt = None - self._last_connection_failure = None + self.last_attempt = 0 + self.last_failure = 0 + self._processing = False def connect(self): """Attempt to connect and return ConnectionState""" @@ -69,34 +74,47 @@ class BrokerConnection(object): self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes) self._sock.setblocking(False) ret = self._sock.connect_ex((self.host, self.port)) - self._last_connection_attempt = time.time() + self.last_attempt = time.time() if not ret or ret is errno.EISCONN: self.state = ConnectionStates.CONNECTED elif ret in (errno.EINPROGRESS, errno.EALREADY): self.state = ConnectionStates.CONNECTING else: - log.error('Connect attempt returned error %s. Disconnecting.', ret) + log.error('Connect attempt to %s returned error %s.' + ' Disconnecting.', self, ret) self.close() - self._last_connection_failure = time.time() + self.last_failure = time.time() if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status - if time.time() > (self._request_timeout_ms / 1000.0) + self._last_connection_attempt: - log.error('Connection attempt timed out') + if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt: + log.error('Connection attempt to %s timed out', self) self.close() # error=TimeoutError ? - self._last_connection_failure = time.time() + self.last_failure = time.time() ret = self._sock.connect_ex((self.host, self.port)) if not ret or ret is errno.EISCONN: self.state = ConnectionStates.CONNECTED elif ret is not errno.EALREADY: - log.error('Connect attempt returned error %s. Disconnecting.', ret) + log.error('Connect attempt to %s returned error %s.' + ' Disconnecting.', self, ret) self.close() - self._last_connection_failure = time.time() + self.last_failure = time.time() return self.state + def blacked_out(self): + """ + Return true if we are disconnected from the given node and can't + re-establish a connection yet + """ + if self.state is ConnectionStates.DISCONNECTED: + now = time.time() + if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0: + return True + return False + def connected(self): return self.state is ConnectionStates.CONNECTED @@ -105,17 +123,15 @@ class BrokerConnection(object): self._sock.close() self._sock = None self.state = ConnectionStates.DISCONNECTED - + self._receiving = False + self._next_payload_bytes = 0 + self._rbuffer.seek(0) + self._rbuffer.truncate() if error is None: error = Errors.DisconnectError() while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) - self.in_flight_requests.clear() - self._receiving = False - self._next_payload_bytes = 0 - self._rbuffer.seek(0) - self._rbuffer.truncate() def send(self, request, expect_response=True): """send request, return Future() @@ -125,6 +141,8 @@ class BrokerConnection(object): future = Future() if not self.connected(): return future.failure(Errors.DisconnectError()) + if not self.can_send_more(): + return future.failure(Errors.TooManyInFlightRequests()) self._correlation_id += 1 header = RequestHeader(request, correlation_id=self._correlation_id, @@ -142,10 +160,10 @@ class BrokerConnection(object): assert sent_bytes == len(message) self._sock.setblocking(False) except (AssertionError, socket.error) as e: - log.debug("Error in BrokerConnection.send(): %s", request) + log.exception("Error sending %s to %s", request, self) self.close(error=e) return future.failure(e) - log.debug('Request %d: %s', self._correlation_id, request) + log.debug('%s Request %d: %s', self, self._correlation_id, request) if expect_response: ifr = InFlightRequest(request=request, @@ -159,24 +177,35 @@ class BrokerConnection(object): return future + def can_send_more(self): + return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection + def recv(self, timeout=0): """Non-blocking network receive Return response if available """ + if self._processing: + raise Errors.IllegalStateError('Recursive connection processing' + ' not supported') if not self.connected(): - log.warning('Cannot recv: socket not connected') + log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and # fail all the pending request futures if self.in_flight_requests: self.close() return None - if not self.in_flight_requests: - log.warning('No in-flight-requests to recv') + elif not self.in_flight_requests: + log.warning('%s: No in-flight-requests to recv', self) return None - self._fail_timed_out_requests() + elif self._requests_timed_out(): + log.warning('%s timed out after %s ms. Closing connection.', + self, self._request_timeout_ms) + self.close(error=Errors.RequestTimedOutError( + 'Request timed out after %s ms' % self._request_timeout_ms)) + return None readable, _, _ = select([self._sock], [], [], timeout) if not readable: @@ -193,7 +222,8 @@ class BrokerConnection(object): # This shouldn't happen after selecting above # but just in case return None - log.exception("Error receiving 4-byte payload header - closing socket") + log.exception('%s: Error receiving 4-byte payload header -' + ' closing socket', self) self.close(error=e) return None @@ -216,7 +246,7 @@ class BrokerConnection(object): # header, but nothing to read in the body yet if e.errno == errno.EWOULDBLOCK: return None - log.exception() + log.exception('%s: Error in recv', self) self.close(error=e) return None @@ -236,6 +266,11 @@ class BrokerConnection(object): return response def _process_response(self, read_buffer): + if self._processing: + raise Errors.IllegalStateError('Recursive connection processing' + ' not supported') + else: + self._processing = True ifr = self.in_flight_requests.popleft() # verify send/recv correlation ids match @@ -246,23 +281,23 @@ class BrokerConnection(object): % (ifr.correlation_id, recv_correlation_id)) ifr.future.fail(error) self.close() + self._processing = False return None # decode response response = ifr.response_type.decode(read_buffer) + log.debug('%s Response %d: %s', self, ifr.correlation_id, response) ifr.future.success(response) - log.debug('Response %d: %s', ifr.correlation_id, response) + self._processing = False return response - def _fail_timed_out_requests(self): - now = time.time() - while self.in_flight_requests: - next_timeout = self.in_flight_requests[0].timestamp + (self._request_timeout_ms / 1000.0) - if now < next_timeout: - break - timed_out = self.in_flight_requests.popleft() - error = Errors.RequestTimedOutError('Request timed out after %s ms' % self._request_timeout_ms) - timed_out.future.failure(error) + def _requests_timed_out(self): + if self.in_flight_requests: + oldest_at = self.in_flight_requests[0].timestamp + timeout = self._request_timeout_ms / 1000.0 + if time.time() >= oldest_at + timeout: + return True + return False def __repr__(self): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) |