summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-07 18:52:05 -0700
committerDana Powers <dana.powers@rd.io>2014-09-07 19:09:32 -0700
commit715425c639a476139065689afde3d255a07d6f96 (patch)
tree0ef2cd875c97c8ca867d89328d6fd5fec7dfcbe8 /kafka/client.py
parenta99384f4c601d127ab1c4fe5b272ea5c07fd695d (diff)
parentbe23042ecd9ab330886745ccc9ec9e3a0039836f (diff)
downloadkafka-python-715425c639a476139065689afde3d255a07d6f96.tar.gz
Merge pull request #227 from wizzat-feature/py3
Python 3 Support Conflicts: kafka/producer.py test/test_client.py test/test_client_integration.py test/test_codec.py test/test_consumer.py test/test_consumer_integration.py test/test_failover_integration.py test/test_producer.py test/test_producer_integration.py test/test_protocol.py test/test_util.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py17
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8630f66..a918091 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,3 +1,4 @@
+import binascii
import collections
import copy
import functools
@@ -20,7 +21,7 @@ log = logging.getLogger("kafka")
class KafkaClient(object):
- CLIENT_ID = "kafka-python"
+ CLIENT_ID = b"kafka-python"
ID_GEN = itertools.count()
# NOTE: The timeout given to the client should always be greater than the
@@ -81,7 +82,7 @@ class KafkaClient(object):
"""
Generate a new correlation id
"""
- return KafkaClient.ID_GEN.next()
+ return next(KafkaClient.ID_GEN)
def _send_broker_unaware_request(self, requestId, request):
"""
@@ -96,7 +97,7 @@ class KafkaClient(object):
return response
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (request, host, port, e))
+ "trying next server: %s" % (binascii.b2a_hex(request), host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -145,7 +146,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self._get_conn(broker.host, broker.port)
+ conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -160,11 +161,11 @@ class KafkaClient(object):
response = conn.recv(requestId)
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
- "from server %s: %s", request, conn, e)
+ "from server %s: %s", binascii.b2a_hex(request), conn, e)
failed = True
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
- request, conn, e)
+ binascii.b2a_hex(request), conn, e)
failed = True
if failed:
@@ -233,8 +234,8 @@ class KafkaClient(object):
A reinit() has to be done on the copy before it can be used again
"""
c = copy.deepcopy(self)
- for k, v in c.conns.items():
- c.conns[k] = v.copy()
+ for key in c.conns:
+ c.conns[key] = self.conns[key].copy()
return c
def reinit(self):