summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 11:50:42 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 12:16:50 -0800
commitcfae9e3fa3432fad6bbd97c1d81f8ea4bc0ee363 (patch)
tree5d1d555a68d88489593362fa1f496b7f1278369b /kafka/client_async.py
parent9bc01657ed9402b502f7156ae95764029436eab3 (diff)
downloadkafka-python-cfae9e3fa3432fad6bbd97c1d81f8ea4bc0ee363.tar.gz
Remove unnecessary calls in KafkaClient._poll
- Dont process connections; outer poll() loop does this now - Only recv connections that select says are readable
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py26
1 files changed, 13 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ca81214..eaa5ef0 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -264,22 +264,22 @@ class KafkaClient(object):
def _poll(self, timeout):
# select on reads across all connected sockets, blocking up to timeout
- sockets = [conn._sock for conn in six.itervalues(self._conns)
- if (conn.state is ConnectionStates.CONNECTED and
- conn.in_flight_requests)]
- if sockets:
- select.select(sockets, [], [], timeout)
+ sockets = dict([(conn._sock, conn)
+ for conn in six.itervalues(self._conns)
+ if (conn.state is ConnectionStates.CONNECTED
+ and conn.in_flight_requests)])
+ if not sockets:
+ return []
+
+ ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
responses = []
# list, not iterator, because inline callbacks may add to self._conns
- for conn in list(self._conns.values()):
- if conn.state is ConnectionStates.CONNECTING:
- conn.connect()
-
- if conn.in_flight_requests:
- response = conn.recv() # This will run callbacks / errbacks
- if response:
- responses.append(response)
+ for sock in ready:
+ conn = sockets[sock]
+ response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+ if response:
+ responses.append(response)
return responses
def in_flight_request_count(self, node_id=None):