diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-10-12 00:11:18 -0700 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-11-10 17:57:57 -0800 |
commit | 04920bb89f9d73e4028dbd487719975c65954592 (patch) | |
tree | d02675d919e29670a01c385c3eccb48ce1baf804 /kafka/client.py | |
parent | e99a934bab1d551d07dd0c6365f6a730028489f3 (diff) | |
download | kafka-python-04920bb89f9d73e4028dbd487719975c65954592.tar.gz |
Unblocking broker aware request
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 48 |
1 files changed, 30 insertions, 18 deletions
diff --git a/kafka/client.py b/kafka/client.py index 13777a4..68277ed 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,6 +2,7 @@ import collections import copy import functools import logging +import select import time import kafka.common @@ -177,6 +178,10 @@ class KafkaClient(object): # For each broker, send the list of request payloads # and collect the responses and errors broker_failures = [] + + # For each KafkaConnection we store the real socket so that we can use + # a select to perform unblocking I/O + socket_connection = {} for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() log.debug('Request %s to %s: %s', requestId, broker, payloads) @@ -210,27 +215,34 @@ class KafkaClient(object): topic_partition = (payload.topic, payload.partition) responses[topic_partition] = None continue + else: + socket_connection[conn.get_connected_socket()] = (conn, broker) - try: - response = conn.recv(requestId) - except ConnectionError as e: - broker_failures.append(broker) - log.warning('ConnectionError attempting to receive a ' - 'response to request %s from server %s: %s', - requestId, broker, e) + conn = None + while socket_connection: + sockets = socket_connection.keys() + rlist, _, _ = select.select(sockets, [], [], None) + conn, broker = socket_connection.pop(rlist[0]) + try: + response = conn.recv(requestId) + except ConnectionError as e: + broker_failures.append(broker) + log.warning('ConnectionError attempting to receive a ' + 'response to request %s from server %s: %s', + requestId, broker, e) - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) - else: - _resps = [] - for payload_response in decoder_fn(response): - topic_partition = (payload_response.topic, - payload_response.partition) - responses[topic_partition] = payload_response - _resps.append(payload_response) - log.debug('Response %s: %s', requestId, _resps) + else: + _resps = [] + for payload_response in decoder_fn(response): + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) # Connection errors generally mean stale metadata # although sometimes it means incorrect api request |