diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8630f66..a918091 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,3 +1,4 @@ +import binascii import collections import copy import functools @@ -20,7 +21,7 @@ log = logging.getLogger("kafka") class KafkaClient(object): - CLIENT_ID = "kafka-python" + CLIENT_ID = b"kafka-python" ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the @@ -81,7 +82,7 @@ class KafkaClient(object): """ Generate a new correlation id """ - return KafkaClient.ID_GEN.next() + return next(KafkaClient.ID_GEN) def _send_broker_unaware_request(self, requestId, request): """ @@ -96,7 +97,7 @@ class KafkaClient(object): return response except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (request, host, port, e)) + "trying next server: %s" % (binascii.b2a_hex(request), host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -145,7 +146,7 @@ class KafkaClient(object): # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self._get_conn(broker.host, broker.port) + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) @@ -160,11 +161,11 @@ class KafkaClient(object): response = conn.recv(requestId) except ConnectionError as e: log.warning("Could not receive response to request [%s] " - "from server %s: %s", request, conn, e) + "from server %s: %s", binascii.b2a_hex(request), conn, e) failed = True except ConnectionError as e: log.warning("Could not send request [%s] to server %s: %s", - request, conn, e) + binascii.b2a_hex(request), conn, e) failed = True if failed: @@ -233,8 +234,8 @@ class KafkaClient(object): A reinit() has to be done on the copy before it can be used again """ c = copy.deepcopy(self) - for k, v in c.conns.items(): - c.conns[k] = v.copy() + for key in c.conns: + c.conns[key] = self.conns[key].copy() return c def reinit(self): |