diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2017-02-09 12:27:16 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-02-09 12:27:16 -0800 |
commit | 8fde79dbb5a3793b1a9ebd10e032d5f3dd535645 (patch) | |
tree | a991daae07aa142d936b37a2af7f55030355357b /kafka/client.py | |
parent | e825483d49bda41f13420311cbc9ffd59f7cee3d (diff) | |
download | kafka-python-8fde79dbb5a3793b1a9ebd10e032d5f3dd535645.tar.gz |
PEP-8: Spacing & removed unused imports (#899)
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/kafka/client.py b/kafka/client.py index 46955e2..ff0169b 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -248,7 +248,6 @@ class SimpleClient(object): failed_payloads(broker_payloads) continue - host, port, afi = get_ip_port_afi(broker.host) try: conn = self._get_conn(host, broker.port, afi) @@ -348,20 +347,20 @@ class SimpleClient(object): # Send the list of request payloads and collect the responses and # errors responses = {} - requestId = self._next_id() - log.debug('Request %s to %s: %s', requestId, broker, payloads) + request_id = self._next_id() + log.debug('Request %s to %s: %s', request_id, broker, payloads) request = encoder_fn(client_id=self.client_id, - correlation_id=requestId, payloads=payloads) + correlation_id=request_id, payloads=payloads) # Send the request, recv the response try: host, port, afi = get_ip_port_afi(broker.host) conn = self._get_conn(host, broker.port, afi) - conn.send(requestId, request) + conn.send(request_id, request) except ConnectionError as e: log.warning('ConnectionError attempting to send request %s ' - 'to server %s: %s', requestId, broker, e) + 'to server %s: %s', request_id, broker, e) for payload in payloads: topic_partition = (payload.topic, payload.partition) @@ -375,18 +374,18 @@ class SimpleClient(object): # ProduceRequest w/ acks = 0 if decoder_fn is None: log.debug('Request %s does not expect a response ' - '(skipping conn.recv)', requestId) + '(skipping conn.recv)', request_id) for payload in payloads: topic_partition = (payload.topic, payload.partition) responses[topic_partition] = None return [] try: - response = conn.recv(requestId) + response = conn.recv(request_id) except ConnectionError as e: log.warning('ConnectionError attempting to receive a ' 'response to request %s from server %s: %s', - requestId, broker, e) + request_id, broker, e) for payload in payloads: topic_partition = (payload.topic, payload.partition) @@ -399,7 +398,7 @@ class SimpleClient(object): payload_response.partition) responses[topic_partition] = payload_response _resps.append(payload_response) - log.debug('Response %s: %s', requestId, _resps) + log.debug('Response %s: %s', request_id, _resps) # Return responses in the same order as provided return [responses[tp] for tp in original_ordering] @@ -473,8 +472,8 @@ class SimpleClient(object): def has_metadata_for_topic(self, topic): return ( - topic in self.topic_partitions - and len(self.topic_partitions[topic]) > 0 + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 ) def get_partition_ids_for_topic(self, topic): @@ -487,7 +486,7 @@ class SimpleClient(object): def topics(self): return list(self.topic_partitions.keys()) - def ensure_topic_exists(self, topic, timeout = 30): + def ensure_topic_exists(self, topic, timeout=30): start_time = time.time() while not self.has_metadata_for_topic(topic): |