diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-15 21:52:58 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-15 21:52:58 -0700 |
commit | e3b1ad24b80dd60e3159566740f40fc6f5811070 (patch) | |
tree | ce09520d349f62cee520cb51ee83f6c6864b378e /kafka/client.py | |
parent | 0d2164431f8245359c417473fd84e7af034f1306 (diff) | |
download | kafka-python-pending_completions.tar.gz |
Move callback processing from BrokerConnection to KafkaClientpending_completions
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index 75b05bf..22918ac 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -175,7 +175,8 @@ class SimpleClient(object): # Block while not future.is_done: - conn.recv() + for r, f in conn.recv(): + f.success(r) if future.failed(): log.error("Request failed: %s", future.exception) @@ -288,7 +289,8 @@ class SimpleClient(object): if not future.is_done: conn, _ = connections_by_future[future] - conn.recv() + for r, f in conn.recv(): + f.success(r) continue _, broker = connections_by_future.pop(future) @@ -352,8 +354,6 @@ class SimpleClient(object): try: host, port, afi = get_ip_port_afi(broker.host) conn = self._get_conn(host, broker.port, afi) - conn.send(request_id, request) - except ConnectionError as e: log.warning('ConnectionError attempting to send request %s ' 'to server %s: %s', request_id, broker, e) @@ -365,6 +365,11 @@ class SimpleClient(object): # No exception, try to get response else: + future = conn.send(request_id, request) + while not future.is_done: + for r, f in conn.recv(): + f.success(r) + # decoder_fn=None signal that the server is expected to not # send a response. This probably only applies to # ProduceRequest w/ acks = 0 @@ -376,18 +381,17 @@ class SimpleClient(object): responses[topic_partition] = None return [] - try: - response = conn.recv(request_id) - except ConnectionError as e: - log.warning('ConnectionError attempting to receive a ' + if future.failed(): + log.warning('Error attempting to receive a ' 'response to request %s from server %s: %s', - request_id, broker, e) + request_id, broker, future.exception) for payload in payloads: topic_partition = (payload.topic, payload.partition) responses[topic_partition] = FailedPayloadsError(payload) else: + response = future.value _resps = [] for payload_response in decoder_fn(response): topic_partition = (payload_response.topic, |