summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 75b05bf..22918ac 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -175,7 +175,8 @@ class SimpleClient(object):
# Block
while not future.is_done:
- conn.recv()
+ for r, f in conn.recv():
+ f.success(r)
if future.failed():
log.error("Request failed: %s", future.exception)
@@ -288,7 +289,8 @@ class SimpleClient(object):
if not future.is_done:
conn, _ = connections_by_future[future]
- conn.recv()
+ for r, f in conn.recv():
+ f.success(r)
continue
_, broker = connections_by_future.pop(future)
@@ -352,8 +354,6 @@ class SimpleClient(object):
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
- conn.send(request_id, request)
-
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)
@@ -365,6 +365,11 @@ class SimpleClient(object):
# No exception, try to get response
else:
+ future = conn.send(request_id, request)
+ while not future.is_done:
+ for r, f in conn.recv():
+ f.success(r)
+
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
@@ -376,18 +381,17 @@ class SimpleClient(object):
responses[topic_partition] = None
return []
- try:
- response = conn.recv(request_id)
- except ConnectionError as e:
- log.warning('ConnectionError attempting to receive a '
+ if future.failed():
+ log.warning('Error attempting to receive a '
'response to request %s from server %s: %s',
- request_id, broker, e)
+ request_id, broker, future.exception)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
else:
+ response = future.value
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,