diff options
Diffstat (limited to 'test/test_unit.py')
-rw-r--r-- | test/test_unit.py | 136 |
1 files changed, 6 insertions, 130 deletions
diff --git a/test/test_unit.py b/test/test_unit.py index 624fe39..3239e6a 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,7 +3,8 @@ import random import struct import unittest -from mock import patch +from mock import MagicMock, patch + from kafka import KafkaClient from kafka.common import ( @@ -366,7 +367,6 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -409,26 +409,22 @@ class TestKafkaClient(unittest.TestCase): def test_send_broker_unaware_request_fail(self): 'Tests that call fails when all hosts are unavailable' - from mock import MagicMock - mocked_conns = { ('kafka01', 9092): MagicMock(), ('kafka02', 9092): MagicMock() } - # inject conns + # inject KafkaConnection side effects mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") def mock_get_conn(host, port): - print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) return mocked_conns[(host, port)] # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts=['kafka01:9092','kafka02:9092']) - + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) self.assertRaises( KafkaUnavailableError, @@ -439,22 +435,19 @@ class TestKafkaClient(unittest.TestCase): conn.send.assert_called_with(1, 'fake request') def test_send_broker_unaware_request(self): - 'Tests that call fails when one of the host is available' - - from mock import MagicMock + 'Tests that call works when at least one of the host is available' mocked_conns = { ('kafka01', 9092): MagicMock(), ('kafka02', 9092): MagicMock(), ('kafka03', 9092): MagicMock() } - # inject conns + # inject KafkaConnection side effects mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") def mock_get_conn(host, port): - print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) return mocked_conns[(host, port)] # patch to avoid making requests before we want it @@ -468,123 +461,6 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) - @unittest.skip('requires disabling recursion on load_metadata_for_topics') - @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata(self, protocol, conn): - - conn.recv.return_value = 'response' # anything but None - - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_2'] = { - 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) - - client = KafkaClient(hosts='broker_1:4567') - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) - - @unittest.skip('requires disabling recursion on load_metadata_for_topics') - @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata_unassigned_partitions(self, protocol, conn): - - conn.recv.return_value = 'response' # anything but None - - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) - - client = KafkaClient(hosts='broker_1:4567') - - self.assertItemsEqual({}, client.topics_to_brokers) - self.assertRaises( - Exception, - client._get_leader_for_partition, - 'topic_1', 0) - - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) - } - leader = client._get_leader_for_partition('topic_1', 0) - - self.assertEqual(brokers[0], leader) - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - }, - client.topics_to_brokers) - - @unittest.skip('requires disabling recursion on load_metadata_for_topics') - @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_client_load_metadata_noleader_partitions(self, protocol, conn): - - conn.recv.return_value = 'response' # anything but None - - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, -1, [], []) - } - topics['topic_2'] = { - 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), - 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) - - client = KafkaClient(hosts='broker_1:4567') - self.assertItemsEqual( - { - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) - self.assertRaises( - Exception, - client._get_leader_for_partition, - 'topic_1', 0) - - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) - } - leader = client._get_leader_for_partition('topic_1', 0) - - self.assertEqual(brokers[0], leader) - self.assertItemsEqual( - { - TopicAndPartition('topic_1', 0): brokers[0], - TopicAndPartition('topic_2', 0): brokers[0], - TopicAndPartition('topic_2', 1): brokers[1] - }, - client.topics_to_brokers) if __name__ == '__main__': unittest.main() |