diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 17:33:26 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-08 18:59:23 -0700 |
commit | fa997e2ee105cbdacc146fd03e6cac8a5c6cef72 (patch) | |
tree | 6bc456433dac049cfdcb6689302f726903865017 /kafka/client.py | |
parent | 0dc6663d24f6b9386ac2119a4a11836391e5da65 (diff) | |
download | kafka-python-fa997e2ee105cbdacc146fd03e6cac8a5c6cef72.tar.gz |
Prefer single quotes for strings
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 26 |
1 files changed, 12 insertions, 14 deletions
diff --git a/kafka/client.py b/kafka/client.py index 20e20f2..18327ee 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -22,7 +22,7 @@ log = logging.getLogger(__name__) class KafkaClient(object): - CLIENT_ID = b"kafka-python" + CLIENT_ID = b'kafka-python' # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -50,7 +50,7 @@ class KafkaClient(object): ################## def _get_conn(self, host, port): - "Get or create a connection to a broker using host and port" + """Get or create a connection to a broker using host and port""" host_key = (host, port) if host_key not in self.conns: self.conns[host_key] = KafkaConnection( @@ -122,10 +122,10 @@ class KafkaClient(object): return decoder_fn(response) except Exception: - log.exception("Could not send request [%r] to server %s:%i, " - "trying next server" % (requestId, host, port)) + log.exception('Error sending request [%s] to server %s:%s, ' + 'trying next server', requestId, host, port) - raise KafkaUnavailableError("All servers failed to process request") + raise KafkaUnavailableError('All servers failed to process request') def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -180,7 +180,7 @@ class KafkaClient(object): except ConnectionError as e: broker_failures.append(broker) - log.warning("Could not send request [%s] to server %s: %s", + log.warning('Could not send request [%s] to server %s: %s', binascii.b2a_hex(request), broker, e) for payload in payloads: @@ -201,15 +201,14 @@ class KafkaClient(object): response = conn.recv(requestId) except ConnectionError as e: broker_failures.append(broker) - log.warning("Could not receive response to request [%s] " - "from server %s: %s", + log.warning('Could not receive response to request [%s] ' + 'from server %s: %s', binascii.b2a_hex(request), conn, e) for payload in payloads: responses_by_broker[broker].append(FailedPayloadsError(payload)) else: - for payload_response in decoder_fn(response): responses_by_broker[broker].append(payload_response) @@ -300,7 +299,7 @@ class KafkaClient(object): while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: - raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) + raise KafkaTimeoutError('Unable to create topic {0}'.format(topic)) try: self.load_metadata_for_topics(topic) except LeaderNotAvailableError: @@ -348,8 +347,8 @@ class KafkaClient(object): resp = self.send_metadata_request(topics) - log.debug("Received new broker metadata: %s", resp.brokers) - log.debug("Received new topic metadata: %s", resp.topics) + log.debug('Received new broker metadata: %s', resp.brokers) + log.debug('Received new topic metadata: %s', resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) @@ -368,7 +367,7 @@ class KafkaClient(object): raise # Otherwise, just log a warning - log.error("Error loading topic metadata for %s: %s", topic, type(e)) + log.error('Error loading topic metadata for %s: %s', topic, type(e)) continue self.topic_partitions[topic] = {} @@ -409,7 +408,6 @@ class KafkaClient(object): def send_metadata_request(self, payloads=[], fail_on_error=True, callback=None): - encoder = KafkaProtocol.encode_metadata_request decoder = KafkaProtocol.decode_metadata_response |