summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Roberts <markroberts@kixeye.com>2014-03-18 20:19:25 -0700
committerMark Roberts <markroberts@kixeye.com>2014-03-18 20:19:25 -0700
commita6fc260f288ac639070783a0f6faa94bd7612c67 (patch)
tree8c123df8e2f5c3afab5184f0d6d6f2f9ba8f5aa8
parent888f206d5417e95f26de407b28fe935950aea2c9 (diff)
parent9599215bf28b65a29908b8644dcaa6f3614a425d (diff)
downloadkafka-python-a6fc260f288ac639070783a0f6faa94bd7612c67.tar.gz
Merge branch 'master' into conn_refactor
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/conn.py4
-rw-r--r--test/test_unit.py10
3 files changed, 15 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 12452de..29ee69b 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -83,14 +83,14 @@ class KafkaClient(object):
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
- conn = self._get_conn(host, port)
try:
+ conn = self._get_conn(host, port)
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
- log.warning("Could not send request [%r] to server %s, "
- "trying next server: %s" % (request, conn, e))
+ log.warning("Could not send request [%r] to server %s:%i, "
+ "trying next server: %s" % (request, host, port, e))
continue
raise KafkaUnavailableError("All servers failed to process request")
diff --git a/kafka/conn.py b/kafka/conn.py
index 7dcd726..4fdeb17 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -15,11 +15,11 @@ DEFAULT_KAFKA_PORT = 9092
def collect_hosts(hosts, randomize=True):
"""
- Collects a comma-separated set of hosts (host:port) and optionnaly
+ Collects a comma-separated set of hosts (host:port) and optionally
randomize the returned list.
"""
- if isinstance(hosts, str):
+ if isinstance(hosts, basestring):
hosts = hosts.strip().split(',')
result = []
diff --git a/test/test_unit.py b/test/test_unit.py
index aec0a2c..081acc7 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -449,6 +449,16 @@ class TestKafkaClient(unittest.TestCase):
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
+ def test_init_with_unicode_csv(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'