summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-21 21:20:38 +0900
committerGitHub <noreply@github.com>2017-10-21 21:20:38 +0900
commit146b893e0fbac21150f74a8ba2f17cc64e1714ad (patch)
tree071b27882c76ae44103d6047c83c818a3b9582b8 /kafka/client.py
parent0bd5d2ab5738065df410ec2f9381844b28fe7425 (diff)
parente3b1ad24b80dd60e3159566740f40fc6f5811070 (diff)
downloadkafka-python-146b893e0fbac21150f74a8ba2f17cc64e1714ad.tar.gz
Merge pull request #1258 from dpkp/pending_completions
Move callback processing from BrokerConnection to KafkaClient
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 75b05bf..22918ac 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -175,7 +175,8 @@ class SimpleClient(object):
# Block
while not future.is_done:
- conn.recv()
+ for r, f in conn.recv():
+ f.success(r)
if future.failed():
log.error("Request failed: %s", future.exception)
@@ -288,7 +289,8 @@ class SimpleClient(object):
if not future.is_done:
conn, _ = connections_by_future[future]
- conn.recv()
+ for r, f in conn.recv():
+ f.success(r)
continue
_, broker = connections_by_future.pop(future)
@@ -352,8 +354,6 @@ class SimpleClient(object):
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
- conn.send(request_id, request)
-
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)
@@ -365,6 +365,11 @@ class SimpleClient(object):
# No exception, try to get response
else:
+ future = conn.send(request_id, request)
+ while not future.is_done:
+ for r, f in conn.recv():
+ f.success(r)
+
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
@@ -376,18 +381,17 @@ class SimpleClient(object):
responses[topic_partition] = None
return []
- try:
- response = conn.recv(request_id)
- except ConnectionError as e:
- log.warning('ConnectionError attempting to receive a '
+ if future.failed():
+ log.warning('Error attempting to receive a '
'response to request %s from server %s: %s',
- request_id, broker, e)
+ request_id, broker, future.exception)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
else:
+ response = future.value
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,