summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py45
1 files changed, 23 insertions, 22 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 63b33b3..2ef22b3 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -4,8 +4,8 @@ import copy
import functools
import logging
import time
-import kafka.common
+import kafka.common
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
@@ -22,7 +22,7 @@ log = logging.getLogger(__name__)
class KafkaClient(object):
- CLIENT_ID = b"kafka-python"
+ CLIENT_ID = b'kafka-python'
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -50,7 +50,7 @@ class KafkaClient(object):
##################
def _get_conn(self, host, port):
- "Get or create a connection to a broker using host and port"
+ """Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(
@@ -111,6 +111,7 @@ class KafkaClient(object):
"""
for (host, port) in self.hosts:
requestId = self._next_id()
+ log.debug('Request %s: %s', requestId, payloads)
try:
conn = self._get_conn(host, port)
request = encoder_fn(client_id=self.client_id,
@@ -119,13 +120,15 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
- return decoder_fn(response)
+ decoded = decoder_fn(response)
+ log.debug('Response %s: %s', requestId, decoded)
+ return decoded
except Exception:
- log.exception("Could not send request [%r] to server %s:%i, "
- "trying next server" % (requestId, host, port))
+ log.exception('Error sending request [%s] to server %s:%s, '
+ 'trying next server', requestId, host, port)
- raise KafkaUnavailableError("All servers failed to process request")
+ raise KafkaUnavailableError('All servers failed to process request')
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -150,9 +153,6 @@ class KafkaClient(object):
List of response objects in the same order as the supplied payloads
"""
-
- log.debug("Sending Payloads: %s" % payloads)
-
# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
@@ -170,6 +170,7 @@ class KafkaClient(object):
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
+ log.debug('Request %s to %s: %s', requestId, broker, payloads)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -180,7 +181,7 @@ class KafkaClient(object):
except ConnectionError as e:
broker_failures.append(broker)
- log.warning("Could not send request [%s] to server %s: %s",
+ log.warning('Could not send request [%s] to server %s: %s',
binascii.b2a_hex(request), broker, e)
for payload in payloads:
@@ -201,15 +202,14 @@ class KafkaClient(object):
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
- log.warning("Could not receive response to request [%s] "
- "from server %s: %s",
+ log.warning('Could not receive response to request [%s] '
+ 'from server %s: %s',
binascii.b2a_hex(request), conn, e)
for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
else:
-
for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)
@@ -223,7 +223,6 @@ class KafkaClient(object):
# Return responses in the same order as provided
responses_by_payload = [responses_by_broker[broker].pop(0)
for broker in brokers_for_payloads]
- log.debug('Responses: %s' % responses_by_payload)
return responses_by_payload
def __repr__(self):
@@ -254,8 +253,11 @@ class KafkaClient(object):
def copy(self):
"""
- Create an inactive copy of the client object
- A reinit() has to be done on the copy before it can be used again
+ Create an inactive copy of the client object, suitable for passing
+ to a separate thread.
+
+ Note that the copied connections are not initialized, so reinit() must
+ be called on the returned copy.
"""
c = copy.deepcopy(self)
for key in c.conns:
@@ -297,7 +299,7 @@ class KafkaClient(object):
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
- raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
+ raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
@@ -345,8 +347,8 @@ class KafkaClient(object):
resp = self.send_metadata_request(topics)
- log.debug("Received new broker metadata: %s", resp.brokers)
- log.debug("Received new topic metadata: %s", resp.topics)
+ log.debug('Received new broker metadata: %s', resp.brokers)
+ log.debug('Received new topic metadata: %s', resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
@@ -365,7 +367,7 @@ class KafkaClient(object):
raise
# Otherwise, just log a warning
- log.error("Error loading topic metadata for %s: %s", topic, type(e))
+ log.error('Error loading topic metadata for %s: %s', topic, type(e))
continue
self.topic_partitions[topic] = {}
@@ -406,7 +408,6 @@ class KafkaClient(object):
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
-
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response