diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-06 06:41:13 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-03-07 16:52:04 -0800 |
commit | 23132863d0e00bd8aabc0e19c7e1822dabfb05b9 (patch) | |
tree | c77c01745bb1b37a2fe2489c1817364f2fd8add0 | |
parent | 492b7d27f5cc4ecfde7af1e380162c5fd9e4206a (diff) | |
download | kafka-python-23132863d0e00bd8aabc0e19c7e1822dabfb05b9.tar.gz |
Dont acquire lock during KafkaClient.send if node is connected / ready
-rw-r--r-- | kafka/client_async.py | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e9d5919..c1bdd82 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -499,14 +499,15 @@ class KafkaClient(object): return True def _can_send_request(self, node_id): - with self._lock: - if node_id not in self._conns: - return False - conn = self._conns[node_id] - return conn.connected() and conn.can_send_more() + conn = self._conns.get(node_id) + if not conn: + return False + return conn.connected() and conn.can_send_more() def send(self, node_id, request): - """Send a request to a specific node. + """Send a request to a specific node. Bytes are placed on an + internal per-connection send-queue. Actual network I/O will be + triggered in a subsequent call to .poll() Arguments: node_id (int): destination node @@ -518,7 +519,7 @@ class KafkaClient(object): Returns: Future: resolves to Response struct or Error """ - with self._lock: + if not self._can_send_request(node_id): if not self._maybe_connect(node_id): return Future().failure(Errors.NodeNotReadyError(node_id)) |