summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-06 06:41:13 -0800
committerJeff Widman <jeff@jeffwidman.com>2019-03-07 16:52:04 -0800
commit23132863d0e00bd8aabc0e19c7e1822dabfb05b9 (patch)
treec77c01745bb1b37a2fe2489c1817364f2fd8add0
parent492b7d27f5cc4ecfde7af1e380162c5fd9e4206a (diff)
downloadkafka-python-23132863d0e00bd8aabc0e19c7e1822dabfb05b9.tar.gz
Dont acquire lock during KafkaClient.send if node is connected / ready
-rw-r--r--kafka/client_async.py15
1 files changed, 8 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e9d5919..c1bdd82 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -499,14 +499,15 @@ class KafkaClient(object):
return True
def _can_send_request(self, node_id):
- with self._lock:
- if node_id not in self._conns:
- return False
- conn = self._conns[node_id]
- return conn.connected() and conn.can_send_more()
+ conn = self._conns.get(node_id)
+ if not conn:
+ return False
+ return conn.connected() and conn.can_send_more()
def send(self, node_id, request):
- """Send a request to a specific node.
+ """Send a request to a specific node. Bytes are placed on an
+ internal per-connection send-queue. Actual network I/O will be
+ triggered in a subsequent call to .poll()
Arguments:
node_id (int): destination node
@@ -518,7 +519,7 @@ class KafkaClient(object):
Returns:
Future: resolves to Response struct or Error
"""
- with self._lock:
+ if not self._can_send_request(node_id):
if not self._maybe_connect(node_id):
return Future().failure(Errors.NodeNotReadyError(node_id))