summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py35
1 files changed, 18 insertions, 17 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 31e4e95..16ac4dc 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -587,6 +587,8 @@ class BrokerConnection(object):
size = Int32.encode(len(msg))
try:
with self._lock:
+ if not self._can_send_recv():
+ return future.failure(Errors.NodeNotReadyError(str(self)))
self._send_bytes_blocking(size + msg)
# The server will send a zero sized message (that is Int32(0)) on success.
@@ -616,6 +618,8 @@ class BrokerConnection(object):
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
self._lock.acquire()
+ if not self._can_send_recv():
+ return future.failure(Errors.NodeNotReadyError(str(self)))
# Establish security context and negotiate protection level
# For reference RFC 2222, section 7.2.1
try:
@@ -677,6 +681,8 @@ class BrokerConnection(object):
msg = bytes(self._build_oauth_client_request().encode("utf-8"))
size = Int32.encode(len(msg))
self._lock.acquire()
+ if not self._can_send_recv():
+ return future.failure(Errors.NodeNotReadyError(str(self)))
try:
# Send SASL OAuthBearer request with OAuth token
self._send_bytes_blocking(size + msg)
@@ -816,6 +822,11 @@ class BrokerConnection(object):
for (_correlation_id, (future, _timestamp)) in ifrs:
future.failure(error)
+ def _can_send_recv(self):
+ """Return True iff socket is ready for requests / responses"""
+ return self.state in (ConnectionStates.AUTHENTICATING,
+ ConnectionStates.CONNECTED)
+
def send(self, request, blocking=True):
"""Queue request for async network send, return Future()"""
future = Future()
@@ -830,8 +841,7 @@ class BrokerConnection(object):
def _send(self, request, blocking=True):
future = Future()
with self._lock:
- if self.state not in (ConnectionStates.AUTHENTICATING,
- ConnectionStates.CONNECTED):
+ if not self._can_send_recv():
return future.failure(Errors.NodeNotReadyError(str(self)))
correlation_id = self._protocol.send_request(request)
@@ -855,8 +865,7 @@ class BrokerConnection(object):
"""Can block on network if request is larger than send_buffer_bytes"""
try:
with self._lock:
- if self.state not in (ConnectionStates.AUTHENTICATING,
- ConnectionStates.CONNECTED):
+ if not self._can_send_recv():
return Errors.NodeNotReadyError(str(self))
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
@@ -882,19 +891,6 @@ class BrokerConnection(object):
Return list of (response, future) tuples
"""
- if self.state not in (ConnectionStates.AUTHENTICATING,
- ConnectionStates.CONNECTED):
- log.warning('%s cannot recv: socket not connected', self)
- # If requests are pending, we should close the socket and
- # fail all the pending request futures
- if self.in_flight_requests:
- self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
- return ()
-
- elif not self.in_flight_requests:
- log.warning('%s: No in-flight-requests to recv', self)
- return ()
-
responses = self._recv()
if not responses and self.requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
@@ -925,6 +921,11 @@ class BrokerConnection(object):
"""Take all available bytes from socket, return list of any responses from parser"""
recvd = []
self._lock.acquire()
+ if not self._can_send_recv():
+ log.warning('%s cannot recv: socket not connected', self)
+ self._lock.release()
+ return ()
+
while len(recvd) < self.config['sock_chunk_buffer_count']:
try:
data = self._sock.recv(self.config['sock_chunk_bytes'])