summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py25
1 files changed, 12 insertions, 13 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 46955e2..ff0169b 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -248,7 +248,6 @@ class SimpleClient(object):
failed_payloads(broker_payloads)
continue
-
host, port, afi = get_ip_port_afi(broker.host)
try:
conn = self._get_conn(host, broker.port, afi)
@@ -348,20 +347,20 @@ class SimpleClient(object):
# Send the list of request payloads and collect the responses and
# errors
responses = {}
- requestId = self._next_id()
- log.debug('Request %s to %s: %s', requestId, broker, payloads)
+ request_id = self._next_id()
+ log.debug('Request %s to %s: %s', request_id, broker, payloads)
request = encoder_fn(client_id=self.client_id,
- correlation_id=requestId, payloads=payloads)
+ correlation_id=request_id, payloads=payloads)
# Send the request, recv the response
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
- conn.send(requestId, request)
+ conn.send(request_id, request)
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
- 'to server %s: %s', requestId, broker, e)
+ 'to server %s: %s', request_id, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
@@ -375,18 +374,18 @@ class SimpleClient(object):
# ProduceRequest w/ acks = 0
if decoder_fn is None:
log.debug('Request %s does not expect a response '
- '(skipping conn.recv)', requestId)
+ '(skipping conn.recv)', request_id)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
return []
try:
- response = conn.recv(requestId)
+ response = conn.recv(request_id)
except ConnectionError as e:
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
- requestId, broker, e)
+ request_id, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
@@ -399,7 +398,7 @@ class SimpleClient(object):
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ log.debug('Response %s: %s', request_id, _resps)
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]
@@ -473,8 +472,8 @@ class SimpleClient(object):
def has_metadata_for_topic(self, topic):
return (
- topic in self.topic_partitions
- and len(self.topic_partitions[topic]) > 0
+ topic in self.topic_partitions
+ and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
@@ -487,7 +486,7 @@ class SimpleClient(object):
def topics(self):
return list(self.topic_partitions.keys())
- def ensure_topic_exists(self, topic, timeout = 30):
+ def ensure_topic_exists(self, topic, timeout=30):
start_time = time.time()
while not self.has_metadata_for_topic(topic):