summaryrefslogtreecommitdiff
path: root/test/test_unit.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_unit.py')
-rw-r--r--test/test_unit.py40
1 files changed, 32 insertions, 8 deletions
diff --git a/test/test_unit.py b/test/test_unit.py
index 4c78c1b..624fe39 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -5,11 +5,13 @@ import unittest
from mock import patch
+from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata
)
+from kafka.common import KafkaUnavailableError
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
@@ -384,6 +386,26 @@ class TestProtocol(unittest.TestCase):
class TestKafkaClient(unittest.TestCase):
+ def test_init_with_list(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
+ def test_init_with_csv(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts='kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
@@ -402,14 +424,16 @@ class TestKafkaClient(unittest.TestCase):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
- with patch.object(KafkaClient, '_load_metadata_for_topics'), \
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
- client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+ client = KafkaClient(hosts=['kafka01:9092','kafka02:9092'])
- resp = client._send_broker_unaware_request(1, 'fake request')
- self.assertIsNone(resp)
+ self.assertRaises(
+ KafkaUnavailableError,
+ client._send_broker_unaware_request,
+ 1, 'fake request')
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
@@ -434,7 +458,7 @@ class TestKafkaClient(unittest.TestCase):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
- with patch.object(KafkaClient, '_load_metadata_for_topics'), \
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
@@ -444,7 +468,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
- @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @unittest.skip('requires disabling recursion on load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata(self, protocol, conn):
@@ -474,7 +498,7 @@ class TestKafkaClient(unittest.TestCase):
},
client.topics_to_brokers)
- @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @unittest.skip('requires disabling recursion on load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
@@ -513,7 +537,7 @@ class TestKafkaClient(unittest.TestCase):
},
client.topics_to_brokers)
- @unittest.skip('requires disabling recursion on _load_metadata_for_topics')
+ @unittest.skip('requires disabling recursion on load_metadata_for_topics')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_client_load_metadata_noleader_partitions(self, protocol, conn):