diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-07 18:52:05 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-07 19:09:32 -0700 |
commit | 715425c639a476139065689afde3d255a07d6f96 (patch) | |
tree | 0ef2cd875c97c8ca867d89328d6fd5fec7dfcbe8 /kafka/client.py | |
parent | a99384f4c601d127ab1c4fe5b272ea5c07fd695d (diff) | |
parent | be23042ecd9ab330886745ccc9ec9e3a0039836f (diff) | |
download | kafka-python-715425c639a476139065689afde3d255a07d6f96.tar.gz |
Merge pull request #227 from wizzat-feature/py3
Python 3 Support
Conflicts:
kafka/producer.py
test/test_client.py
test/test_client_integration.py
test/test_codec.py
test/test_consumer.py
test/test_consumer_integration.py
test/test_failover_integration.py
test/test_producer.py
test/test_producer_integration.py
test/test_protocol.py
test/test_util.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8630f66..a918091 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,3 +1,4 @@ +import binascii import collections import copy import functools @@ -20,7 +21,7 @@ log = logging.getLogger("kafka") class KafkaClient(object): - CLIENT_ID = "kafka-python" + CLIENT_ID = b"kafka-python" ID_GEN = itertools.count() # NOTE: The timeout given to the client should always be greater than the @@ -81,7 +82,7 @@ class KafkaClient(object): """ Generate a new correlation id """ - return KafkaClient.ID_GEN.next() + return next(KafkaClient.ID_GEN) def _send_broker_unaware_request(self, requestId, request): """ @@ -96,7 +97,7 @@ class KafkaClient(object): return response except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (request, host, port, e)) + "trying next server: %s" % (binascii.b2a_hex(request), host, port, e)) raise KafkaUnavailableError("All servers failed to process request") @@ -145,7 +146,7 @@ class KafkaClient(object): # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self._get_conn(broker.host, broker.port) + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) @@ -160,11 +161,11 @@ class KafkaClient(object): response = conn.recv(requestId) except ConnectionError as e: log.warning("Could not receive response to request [%s] " - "from server %s: %s", request, conn, e) + "from server %s: %s", binascii.b2a_hex(request), conn, e) failed = True except ConnectionError as e: log.warning("Could not send request [%s] to server %s: %s", - request, conn, e) + binascii.b2a_hex(request), conn, e) failed = True if failed: @@ -233,8 +234,8 @@ class KafkaClient(object): A reinit() has to be done on the copy before it can be used again """ c = copy.deepcopy(self) - for k, v in c.conns.items(): - c.conns[k] = v.copy() + for key in c.conns: + c.conns[key] = self.conns[key].copy() return c def reinit(self): |