diff options
author | Mark Roberts <markroberts@kixeye.com> | 2014-03-18 20:19:25 -0700 |
---|---|---|
committer | Mark Roberts <markroberts@kixeye.com> | 2014-03-18 20:19:25 -0700 |
commit | a6fc260f288ac639070783a0f6faa94bd7612c67 (patch) | |
tree | 8c123df8e2f5c3afab5184f0d6d6f2f9ba8f5aa8 | |
parent | 888f206d5417e95f26de407b28fe935950aea2c9 (diff) | |
parent | 9599215bf28b65a29908b8644dcaa6f3614a425d (diff) | |
download | kafka-python-a6fc260f288ac639070783a0f6faa94bd7612c67.tar.gz |
Merge branch 'master' into conn_refactor
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/conn.py | 4 | ||||
-rw-r--r-- | test/test_unit.py | 10 |
3 files changed, 15 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py index 12452de..29ee69b 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -83,14 +83,14 @@ class KafkaClient(object): brokers. Keep trying until you succeed. """ for (host, port) in self.hosts: - conn = self._get_conn(host, port) try: + conn = self._get_conn(host, port) conn.send(requestId, request) response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, " - "trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s:%i, " + "trying next server: %s" % (request, host, port, e)) continue raise KafkaUnavailableError("All servers failed to process request") diff --git a/kafka/conn.py b/kafka/conn.py index 7dcd726..4fdeb17 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -15,11 +15,11 @@ DEFAULT_KAFKA_PORT = 9092 def collect_hosts(hosts, randomize=True): """ - Collects a comma-separated set of hosts (host:port) and optionnaly + Collects a comma-separated set of hosts (host:port) and optionally randomize the returned list. """ - if isinstance(hosts, str): + if isinstance(hosts, basestring): hosts = hosts.strip().split(',') result = [] diff --git a/test/test_unit.py b/test/test_unit.py index aec0a2c..081acc7 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -449,6 +449,16 @@ class TestKafkaClient(unittest.TestCase): [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], client.hosts) + def test_init_with_unicode_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + def test_send_broker_unaware_request_fail(self): 'Tests that call fails when all hosts are unavailable' |