summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py48
1 files changed, 30 insertions, 18 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 13777a4..68277ed 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -2,6 +2,7 @@ import collections
import copy
import functools
import logging
+import select
import time
import kafka.common
@@ -177,6 +178,10 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
# and collect the responses and errors
broker_failures = []
+
+ # For each KafkaConnection we store the real socket so that we can use
+ # a select to perform unblocking I/O
+ socket_connection = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -210,27 +215,34 @@ class KafkaClient(object):
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue
+ else:
+ socket_connection[conn.get_connected_socket()] = (conn, broker)
- try:
- response = conn.recv(requestId)
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to receive a '
- 'response to request %s from server %s: %s',
- requestId, broker, e)
+ conn = None
+ while socket_connection:
+ sockets = socket_connection.keys()
+ rlist, _, _ = select.select(sockets, [], [], None)
+ conn, broker = socket_connection.pop(rlist[0])
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError as e:
+ broker_failures.append(broker)
+ log.warning('ConnectionError attempting to receive a '
+ 'response to request %s from server %s: %s',
+ requestId, broker, e)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
- else:
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ else:
+ _resps = []
+ for payload_response in decoder_fn(response):
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request