diff options
-rw-r--r-- | kafka/conn.py | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 31e4e95..16ac4dc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -587,6 +587,8 @@ class BrokerConnection(object): size = Int32.encode(len(msg)) try: with self._lock: + if not self._can_send_recv(): + return future.failure(Errors.NodeNotReadyError(str(self))) self._send_bytes_blocking(size + msg) # The server will send a zero sized message (that is Int32(0)) on success. @@ -616,6 +618,8 @@ class BrokerConnection(object): log.debug('%s: GSSAPI name: %s', self, gssapi_name) self._lock.acquire() + if not self._can_send_recv(): + return future.failure(Errors.NodeNotReadyError(str(self))) # Establish security context and negotiate protection level # For reference RFC 2222, section 7.2.1 try: @@ -677,6 +681,8 @@ class BrokerConnection(object): msg = bytes(self._build_oauth_client_request().encode("utf-8")) size = Int32.encode(len(msg)) self._lock.acquire() + if not self._can_send_recv(): + return future.failure(Errors.NodeNotReadyError(str(self))) try: # Send SASL OAuthBearer request with OAuth token self._send_bytes_blocking(size + msg) @@ -816,6 +822,11 @@ class BrokerConnection(object): for (_correlation_id, (future, _timestamp)) in ifrs: future.failure(error) + def _can_send_recv(self): + """Return True iff socket is ready for requests / responses""" + return self.state in (ConnectionStates.AUTHENTICATING, + ConnectionStates.CONNECTED) + def send(self, request, blocking=True): """Queue request for async network send, return Future()""" future = Future() @@ -830,8 +841,7 @@ class BrokerConnection(object): def _send(self, request, blocking=True): future = Future() with self._lock: - if self.state not in (ConnectionStates.AUTHENTICATING, - ConnectionStates.CONNECTED): + if not self._can_send_recv(): return future.failure(Errors.NodeNotReadyError(str(self))) correlation_id = self._protocol.send_request(request) @@ -855,8 +865,7 @@ class BrokerConnection(object): """Can block on network if request is larger than send_buffer_bytes""" try: with self._lock: - if self.state not in (ConnectionStates.AUTHENTICATING, - ConnectionStates.CONNECTED): + if not self._can_send_recv(): return Errors.NodeNotReadyError(str(self)) # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block @@ -882,19 +891,6 @@ class BrokerConnection(object): Return list of (response, future) tuples """ - if self.state not in (ConnectionStates.AUTHENTICATING, - ConnectionStates.CONNECTED): - log.warning('%s cannot recv: socket not connected', self) - # If requests are pending, we should close the socket and - # fail all the pending request futures - if self.in_flight_requests: - self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests')) - return () - - elif not self.in_flight_requests: - log.warning('%s: No in-flight-requests to recv', self) - return () - responses = self._recv() if not responses and self.requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', @@ -925,6 +921,11 @@ class BrokerConnection(object): """Take all available bytes from socket, return list of any responses from parser""" recvd = [] self._lock.acquire() + if not self._can_send_recv(): + log.warning('%s cannot recv: socket not connected', self) + self._lock.release() + return () + while len(recvd) < self.config['sock_chunk_buffer_count']: try: data = self._sock.recv(self.config['sock_chunk_bytes']) |