diff options
Diffstat (limited to 'test/test_client.py')
-rw-r--r-- | test/test_client.py | 158 |
1 files changed, 90 insertions, 68 deletions
diff --git a/test/test_client.py b/test/test_client.py index 32a2256..58254fc 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,10 +1,11 @@ import unittest2 -from mock import MagicMock, patch +from mock import ANY, MagicMock, patch from kafka import KafkaClient from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, + ProduceRequest, MetadataResponse, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, LeaderUnavailableError, PartitionUnavailableError ) @@ -56,10 +57,12 @@ class TestKafkaClient(unittest2.TestCase): client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) with self.assertRaises(KafkaUnavailableError): - client._send_broker_unaware_request(1, 'fake request') + client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(return_value='fake encoded message'), + decoder_fn=lambda x: x) for key, conn in mocked_conns.iteritems(): - conn.send.assert_called_with(1, 'fake request') + conn.send.assert_called_with(ANY, 'fake encoded message') def test_send_broker_unaware_request(self): 'Tests that call works when at least one of the host is available' @@ -80,12 +83,15 @@ class TestKafkaClient(unittest2.TestCase): # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + with patch.object(KafkaClient, '_next_id', return_value=1): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') - resp = client._send_broker_unaware_request(1, 'fake request') + resp = client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(), + decoder_fn=lambda x: x) - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -94,25 +100,27 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - topics['topic_no_partitions'] = {} - topics['topic_3'] = { - 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), - 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_1', 0, [ + PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0) + ]), + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + TopicMetadata('topic_no_partitions', 0, []), + TopicMetadata('topic_3', 0, [ + PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0), + PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0), + PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # client loads metadata at init client = KafkaClient(hosts=['broker_1:4567']) @@ -132,30 +140,35 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', 0, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) - topics['topic_no_partitions'] = { - 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_one_partition', 0, [ + PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic - leader = client._get_leader_for_partition('topic_no_partitions', 0) + leader = client._get_leader_for_partition('topic_one_partition', 0) self.assertEqual(brokers[0], leader) self.assertDictEqual({ - TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + TopicAndPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers) @patch('kafka.client.KafkaConnection') @@ -165,12 +178,15 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', 0, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) @@ -186,16 +202,18 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -207,31 +225,35 @@ class TestKafkaClient(unittest2.TestCase): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0), + PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Send producer request raises LeaderUnavailableError if leader is not available" + "Send producer request raises LeaderNotAvailableError if leader is not available" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) |