summaryrefslogtreecommitdiff
path: root/test/test_unit.py
diff options
context:
space:
mode:
authorOmar <omar.ghishan@rd.io>2014-02-26 16:55:28 -0800
committerOmar <omar.ghishan@rd.io>2014-02-26 16:55:28 -0800
commitab89a44ecf9c93b116fcc8516cfc21749df74507 (patch)
tree5e318c4c7541bade1da687d2cf5fd145c594dc2b /test/test_unit.py
parente5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d (diff)
parent51910f981843dfa967d24659cdb46117210c832d (diff)
downloadkafka-python-ab89a44ecf9c93b116fcc8516cfc21749df74507.tar.gz
Merge pull request #122 from mrtheb/multihosts
Support for multiple hosts on KafkaClient boostrap (improves on #70)
Diffstat (limited to 'test/test_unit.py')
-rw-r--r--test/test_unit.py84
1 files changed, 83 insertions, 1 deletions
diff --git a/test/test_unit.py b/test/test_unit.py
index b5f0118..aec0a2c 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -3,11 +3,16 @@ import random
import struct
import unittest
+from mock import MagicMock, patch
+
+
+from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata
)
+from kafka.common import KafkaUnavailableError
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
@@ -405,7 +410,6 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_response(self):
pass
-
@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass
@@ -423,5 +427,83 @@ class TestProtocol(unittest.TestCase):
pass
+class TestKafkaClient(unittest.TestCase):
+
+ def test_init_with_list(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
+ def test_init_with_csv(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts='kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
+ def test_send_broker_unaware_request_fail(self):
+ 'Tests that call fails when all hosts are unavailable'
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock()
+ }
+ # inject KafkaConnection side effects
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
+ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+
+ client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
+
+ self.assertRaises(
+ KafkaUnavailableError,
+ client._send_broker_unaware_request,
+ 1, 'fake request')
+
+ for key, conn in mocked_conns.iteritems():
+ conn.send.assert_called_with(1, 'fake request')
+
+ def test_send_broker_unaware_request(self):
+ 'Tests that call works when at least one of the host is available'
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock(),
+ ('kafka03', 9092): MagicMock()
+ }
+ # inject KafkaConnection side effects
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
+ mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
+ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+
+ resp = client._send_broker_unaware_request(1, 'fake request')
+
+ self.assertEqual('valid response', resp)
+ mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
+
+
if __name__ == '__main__':
unittest.main()