diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 01:48:18 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:02:41 -0700 |
commit | 18ac14860791db2382c3e62715f11a6f657f265a (patch) | |
tree | 0616db85b4c8ca4bb3c9f7fb8d4c6a7ad9b63dcc /test | |
parent | eddd1436c226545237aa057c35719950702466ed (diff) | |
download | kafka-python-18ac14860791db2382c3e62715f11a6f657f265a.tar.gz |
Improve metadata protocol handling
- add MetadataRequest and MetadataResponse namedtuples
- add TopicMetadata namedtuple
- add error codes to Topic and Partition Metadata
- add KafkaClient.send_metadata_request() method
- KafkaProtocol.decode_metadata_response changed
to return a MetadataResponse object
so that it is consistent with server api: [broker_list, topic_list]
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client.py | 158 | ||||
-rw-r--r-- | test/test_protocol.py | 74 |
2 files changed, 123 insertions, 109 deletions
diff --git a/test/test_client.py b/test/test_client.py index 32a2256..58254fc 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,10 +1,11 @@ import unittest2 -from mock import MagicMock, patch +from mock import ANY, MagicMock, patch from kafka import KafkaClient from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, + ProduceRequest, MetadataResponse, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, LeaderUnavailableError, PartitionUnavailableError ) @@ -56,10 +57,12 @@ class TestKafkaClient(unittest2.TestCase): client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) with self.assertRaises(KafkaUnavailableError): - client._send_broker_unaware_request(1, 'fake request') + client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(return_value='fake encoded message'), + decoder_fn=lambda x: x) for key, conn in mocked_conns.iteritems(): - conn.send.assert_called_with(1, 'fake request') + conn.send.assert_called_with(ANY, 'fake encoded message') def test_send_broker_unaware_request(self): 'Tests that call works when at least one of the host is available' @@ -80,12 +83,15 @@ class TestKafkaClient(unittest2.TestCase): # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + with patch.object(KafkaClient, '_next_id', return_value=1): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') - resp = client._send_broker_unaware_request(1, 'fake request') + resp = client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(), + decoder_fn=lambda x: x) - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -94,25 +100,27 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - topics['topic_no_partitions'] = {} - topics['topic_3'] = { - 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), - 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_1', 0, [ + PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0) + ]), + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + TopicMetadata('topic_no_partitions', 0, []), + TopicMetadata('topic_3', 0, [ + PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0), + PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0), + PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # client loads metadata at init client = KafkaClient(hosts=['broker_1:4567']) @@ -132,30 +140,35 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', 0, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) - topics['topic_no_partitions'] = { - 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_one_partition', 0, [ + PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic - leader = client._get_leader_for_partition('topic_no_partitions', 0) + leader = client._get_leader_for_partition('topic_one_partition', 0) self.assertEqual(brokers[0], leader) self.assertDictEqual({ - TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + TopicAndPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers) @patch('kafka.client.KafkaConnection') @@ -165,12 +178,15 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', 0, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) @@ -186,16 +202,18 @@ class TestKafkaClient(unittest2.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -207,31 +225,35 @@ class TestKafkaClient(unittest2.TestCase): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0), + PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Send producer request raises LeaderUnavailableError if leader is not available" + "Send producer request raises LeaderNotAvailableError if leader is not available" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', 0, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], 0), + PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) diff --git a/test/test_protocol.py b/test/test_protocol.py index 2089f48..a92d20e 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -11,10 +11,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - ProtocolError, LeaderUnavailableError, PartitionUnavailableError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, PartitionUnavailableError, + UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError, ) from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, @@ -454,21 +454,22 @@ class TestProtocol(unittest2.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in broker_data.iteritems(): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in topic_data.iteritems(): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -478,35 +479,26 @@ class TestProtocol(unittest2.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - "topic1": { - 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) - }, - "topic2": { - 0: PartitionMetadata("topic2", 0, 0, (), ()) - } - } - topic_errors = {"topic1": 0, "topic2": 1} - partition_errors = { - ("topic1", 0): 0, - ("topic1", 1): 1, - ("topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata("topic1", 0, [ + PartitionMetadata("topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata("topic2", 1, [ + PartitionMetadata("topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |