summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-08 17:33:26 -0700
committerDana Powers <dana.powers@rd.io>2015-06-08 18:59:23 -0700
commitfa997e2ee105cbdacc146fd03e6cac8a5c6cef72 (patch)
tree6bc456433dac049cfdcb6689302f726903865017 /kafka/client.py
parent0dc6663d24f6b9386ac2119a4a11836391e5da65 (diff)
downloadkafka-python-fa997e2ee105cbdacc146fd03e6cac8a5c6cef72.tar.gz
Prefer single quotes for strings
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py26
1 files changed, 12 insertions, 14 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 20e20f2..18327ee 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -22,7 +22,7 @@ log = logging.getLogger(__name__)
class KafkaClient(object):
- CLIENT_ID = b"kafka-python"
+ CLIENT_ID = b'kafka-python'
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -50,7 +50,7 @@ class KafkaClient(object):
##################
def _get_conn(self, host, port):
- "Get or create a connection to a broker using host and port"
+ """Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(
@@ -122,10 +122,10 @@ class KafkaClient(object):
return decoder_fn(response)
except Exception:
- log.exception("Could not send request [%r] to server %s:%i, "
- "trying next server" % (requestId, host, port))
+ log.exception('Error sending request [%s] to server %s:%s, '
+ 'trying next server', requestId, host, port)
- raise KafkaUnavailableError("All servers failed to process request")
+ raise KafkaUnavailableError('All servers failed to process request')
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -180,7 +180,7 @@ class KafkaClient(object):
except ConnectionError as e:
broker_failures.append(broker)
- log.warning("Could not send request [%s] to server %s: %s",
+ log.warning('Could not send request [%s] to server %s: %s',
binascii.b2a_hex(request), broker, e)
for payload in payloads:
@@ -201,15 +201,14 @@ class KafkaClient(object):
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
- log.warning("Could not receive response to request [%s] "
- "from server %s: %s",
+ log.warning('Could not receive response to request [%s] '
+ 'from server %s: %s',
binascii.b2a_hex(request), conn, e)
for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
else:
-
for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)
@@ -300,7 +299,7 @@ class KafkaClient(object):
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
- raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
+ raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
@@ -348,8 +347,8 @@ class KafkaClient(object):
resp = self.send_metadata_request(topics)
- log.debug("Received new broker metadata: %s", resp.brokers)
- log.debug("Received new topic metadata: %s", resp.topics)
+ log.debug('Received new broker metadata: %s', resp.brokers)
+ log.debug('Received new topic metadata: %s', resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
@@ -368,7 +367,7 @@ class KafkaClient(object):
raise
# Otherwise, just log a warning
- log.error("Error loading topic metadata for %s: %s", topic, type(e))
+ log.error('Error loading topic metadata for %s: %s', topic, type(e))
continue
self.topic_partitions[topic] = {}
@@ -409,7 +408,6 @@ class KafkaClient(object):
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
-
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response