diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-06-08 22:07:55 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-06-08 22:07:55 -0700 |
commit | fe382a55b253e2c0c4f66052ced1714dbdab65ae (patch) | |
tree | 757bfcd9f27065abe0e9ed51e61076aa6a9767c2 /kafka/client.py | |
parent | 712377ab15ae7c1c0b031df310d60235b7c57cae (diff) | |
parent | 81abf094dcdfbbe78e55ee519b35658fefa410ef (diff) | |
download | kafka-python-fe382a55b253e2c0c4f66052ced1714dbdab65ae.tar.gz |
Merge pull request #394 from dpkp/cleanups
Cleanups
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/kafka/client.py b/kafka/client.py index 63b33b3..2ef22b3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -4,8 +4,8 @@ import copy import functools import logging import time -import kafka.common +import kafka.common from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, @@ -22,7 +22,7 @@ log = logging.getLogger(__name__) class KafkaClient(object): - CLIENT_ID = b"kafka-python" + CLIENT_ID = b'kafka-python' # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -50,7 +50,7 @@ class KafkaClient(object): ################## def _get_conn(self, host, port): - "Get or create a connection to a broker using host and port" + """Get or create a connection to a broker using host and port""" host_key = (host, port) if host_key not in self.conns: self.conns[host_key] = KafkaConnection( @@ -111,6 +111,7 @@ class KafkaClient(object): """ for (host, port) in self.hosts: requestId = self._next_id() + log.debug('Request %s: %s', requestId, payloads) try: conn = self._get_conn(host, port) request = encoder_fn(client_id=self.client_id, @@ -119,13 +120,15 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) - return decoder_fn(response) + decoded = decoder_fn(response) + log.debug('Response %s: %s', requestId, decoded) + return decoded except Exception: - log.exception("Could not send request [%r] to server %s:%i, " - "trying next server" % (requestId, host, port)) + log.exception('Error sending request [%s] to server %s:%s, ' + 'trying next server', requestId, host, port) - raise KafkaUnavailableError("All servers failed to process request") + raise KafkaUnavailableError('All servers failed to process request') def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -150,9 +153,6 @@ class KafkaClient(object): List of response objects in the same order as the supplied payloads """ - - log.debug("Sending Payloads: %s" % payloads) - # Group the requests by topic+partition brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) @@ -170,6 +170,7 @@ class KafkaClient(object): broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() + log.debug('Request %s to %s: %s', requestId, broker, payloads) request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) @@ -180,7 +181,7 @@ class KafkaClient(object): except ConnectionError as e: broker_failures.append(broker) - log.warning("Could not send request [%s] to server %s: %s", + log.warning('Could not send request [%s] to server %s: %s', binascii.b2a_hex(request), broker, e) for payload in payloads: @@ -201,15 +202,14 @@ class KafkaClient(object): response = conn.recv(requestId) except ConnectionError as e: broker_failures.append(broker) - log.warning("Could not receive response to request [%s] " - "from server %s: %s", + log.warning('Could not receive response to request [%s] ' + 'from server %s: %s', binascii.b2a_hex(request), conn, e) for payload in payloads: responses_by_broker[broker].append(FailedPayloadsError(payload)) else: - for payload_response in decoder_fn(response): responses_by_broker[broker].append(payload_response) @@ -223,7 +223,6 @@ class KafkaClient(object): # Return responses in the same order as provided responses_by_payload = [responses_by_broker[broker].pop(0) for broker in brokers_for_payloads] - log.debug('Responses: %s' % responses_by_payload) return responses_by_payload def __repr__(self): @@ -254,8 +253,11 @@ class KafkaClient(object): def copy(self): """ - Create an inactive copy of the client object - A reinit() has to be done on the copy before it can be used again + Create an inactive copy of the client object, suitable for passing + to a separate thread. + + Note that the copied connections are not initialized, so reinit() must + be called on the returned copy. """ c = copy.deepcopy(self) for key in c.conns: @@ -297,7 +299,7 @@ class KafkaClient(object): while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: - raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) + raise KafkaTimeoutError('Unable to create topic {0}'.format(topic)) try: self.load_metadata_for_topics(topic) except LeaderNotAvailableError: @@ -345,8 +347,8 @@ class KafkaClient(object): resp = self.send_metadata_request(topics) - log.debug("Received new broker metadata: %s", resp.brokers) - log.debug("Received new topic metadata: %s", resp.topics) + log.debug('Received new broker metadata: %s', resp.brokers) + log.debug('Received new topic metadata: %s', resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) @@ -365,7 +367,7 @@ class KafkaClient(object): raise # Otherwise, just log a warning - log.error("Error loading topic metadata for %s: %s", topic, type(e)) + log.error('Error loading topic metadata for %s: %s', topic, type(e)) continue self.topic_partitions[topic] = {} @@ -406,7 +408,6 @@ class KafkaClient(object): def send_metadata_request(self, payloads=[], fail_on_error=True, callback=None): - encoder = KafkaProtocol.encode_metadata_request decoder = KafkaProtocol.decode_metadata_response |