diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-02 14:21:13 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-02 14:21:13 -0800 |
commit | 58e2ab41a76518c433d7375a24191018b85ced85 (patch) | |
tree | 4d1fdb80c101bdc2ed0b333ea75d81ddb4fba67c | |
parent | cdcaea6f944df10941522ebcb08946bf34c357db (diff) | |
parent | c2adeeab057b825c8cccae67aac822be02293211 (diff) | |
download | kafka-python-58e2ab41a76518c433d7375a24191018b85ced85.tar.gz |
Merge pull request #473 from ecanzonieri/use_unblocking_io_for_aware_requests
Use unblocking io for broker aware requests
-rw-r--r-- | kafka/client.py | 48 | ||||
-rw-r--r-- | kafka/conn.py | 5 | ||||
-rw-r--r-- | test/test_conn.py | 17 |
3 files changed, 52 insertions, 18 deletions
diff --git a/kafka/client.py b/kafka/client.py index c05e142..64b814b 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,6 +2,7 @@ import collections import copy import functools import logging +import select import time import kafka.common @@ -177,6 +178,10 @@ class KafkaClient(object): # For each broker, send the list of request payloads # and collect the responses and errors broker_failures = [] + + # For each KafkaConnection keep the real socket so that we can use + # a select to perform unblocking I/O + connections_by_socket = {} for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() log.debug('Request %s to %s: %s', requestId, broker, payloads) @@ -210,27 +215,34 @@ class KafkaClient(object): topic_partition = (payload.topic, payload.partition) responses[topic_partition] = None continue + else: + connections_by_socket[conn.get_connected_socket()] = (conn, broker) - try: - response = conn.recv(requestId) - except ConnectionError as e: - broker_failures.append(broker) - log.warning('ConnectionError attempting to receive a ' - 'response to request %s from server %s: %s', - requestId, broker, e) + conn = None + while connections_by_socket: + sockets = connections_by_socket.keys() + rlist, _, _ = select.select(sockets, [], [], None) + conn, broker = connections_by_socket.pop(rlist[0]) + try: + response = conn.recv(requestId) + except ConnectionError as e: + broker_failures.append(broker) + log.warning('ConnectionError attempting to receive a ' + 'response to request %s from server %s: %s', + requestId, broker, e) - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + for payload in payloads_by_broker[broker]: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) - else: - _resps = [] - for payload_response in decoder_fn(response): - topic_partition = (payload_response.topic, - payload_response.partition) - responses[topic_partition] = payload_response - _resps.append(payload_response) - log.debug('Response %s: %s', requestId, _resps) + else: + _resps = [] + for payload_response in decoder_fn(response): + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) # Connection errors generally mean stale metadata # although sometimes it means incorrect api request diff --git a/kafka/conn.py b/kafka/conn.py index e6a1f74..9514e48 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -118,6 +118,11 @@ class KafkaConnection(local): # TODO multiplex socket communication to allow for multi-threaded clients + def get_connected_socket(self): + if not self._sock: + self.reinit() + return self._sock + def send(self, request_id, payload): """ Send a request to Kafka diff --git a/test/test_conn.py b/test/test_conn.py index 2b70344..1bdfc1e 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase): self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2']) + def test_get_connected_socket(self): + s = self.conn.get_connected_socket() + + self.assertEqual(s, self.MockCreateConn()) + + def test_get_connected_socket_on_dirty_conn(self): + # Dirty the connection + try: + self.conn._raise_connection_error() + except ConnectionError: + pass + + # Test that get_connected_socket tries to connect + self.assertEqual(self.MockCreateConn.call_count, 0) + self.conn.get_connected_socket() + self.assertEqual(self.MockCreateConn.call_count, 1) + def test_close__object_is_reusable(self): # test that sending to a closed connection |