diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:15:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-08 13:17:43 -0700 |
commit | fff812ddc80243208233f785b3f005904cf33482 (patch) | |
tree | 30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /test | |
parent | 42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff) | |
parent | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff) | |
download | kafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz |
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor
* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
MetadataResponse object so that it is consistent with server api:
[broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
PartitionUnavailable) with server standard exceptions
(LeaderNotAvailableError, UnknownTopicOrPartitionError)
Conflicts:
kafka/client.py
test/test_client.py
test/test_producer_integration.py
test/test_protocol.py
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client.py | 297 | ||||
-rw-r--r-- | test/test_producer_integration.py | 6 | ||||
-rw-r--r-- | test/test_protocol.py | 73 |
3 files changed, 255 insertions, 121 deletions
diff --git a/test/test_client.py b/test/test_client.py index 274655e..ba6e3b1 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,15 +1,17 @@ import socket from time import sleep -from mock import MagicMock, patch +from mock import ANY, MagicMock, patch import six from . import unittest from kafka import KafkaClient from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, + ProduceRequest, MetadataResponse, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - LeaderUnavailableError, PartitionUnavailableError, + LeaderNotAvailableError, NoError, + UnknownTopicOrPartitionError, KafkaTimeoutError, ConnectionError ) from kafka.conn import KafkaConnection @@ -17,6 +19,10 @@ from kafka.protocol import KafkaProtocol, create_message from test.testutil import Timer +NO_ERROR = 0 +UNKNOWN_TOPIC_OR_PARTITION = 3 +NO_LEADER = 5 + class TestKafkaClient(unittest.TestCase): def test_init_with_list(self): with patch.object(KafkaClient, 'load_metadata_for_topics'): @@ -64,10 +70,12 @@ class TestKafkaClient(unittest.TestCase): req = KafkaProtocol.encode_metadata_request(b'client', 0) with self.assertRaises(KafkaUnavailableError): - client._send_broker_unaware_request(1, req) + client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(return_value='fake encoded message'), + decoder_fn=lambda x: x) for key, conn in six.iteritems(mocked_conns): - conn.send.assert_called_with(1, req) + conn.send.assert_called_with(ANY, 'fake encoded message') def test_send_broker_unaware_request(self): 'Tests that call works when at least one of the host is available' @@ -88,40 +96,46 @@ class TestKafkaClient(unittest.TestCase): # patch to avoid making requests before we want it with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + with patch.object(KafkaClient, '_next_id', return_value=1): + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') - req = KafkaProtocol.encode_metadata_request(b'client', 0) - resp = client._send_broker_unaware_request(1, req) + resp = client._send_broker_unaware_request(payloads=['fake request'], + encoder_fn=MagicMock(), + decoder_fn=lambda x: x) - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_load_metadata(self, protocol, conn): - "Load metadata for all topics" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(1, 'broker_1', 4567) - brokers[1] = BrokerMetadata(2, 'broker_2', 5678) - - topics = {} - topics['topic_1'] = { - 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) - } - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - topics['topic_no_partitions'] = {} - topics['topic_3'] = { - 0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]), - 2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_1', NO_ERROR, [ + PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR) + ]), + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), + ]), + TopicMetadata('topic_no_partitions', NO_LEADER, []), + TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_3', NO_ERROR, [ + PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR), + PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # client loads metadata at init client = KafkaClient(hosts=['broker_1:4567']) @@ -134,6 +148,78 @@ class TestKafkaClient(unittest.TestCase): TopicAndPartition('topic_3', 2): brokers[0]}, client.topics_to_brokers) + # if we ask for metadata explicitly, it should raise errors + with self.assertRaises(LeaderNotAvailableError): + client.load_metadata_for_topics('topic_no_partitions') + + with self.assertRaises(UnknownTopicOrPartitionError): + client.load_metadata_for_topics('topic_unknown') + + # This should not raise + client.load_metadata_for_topics('topic_no_leader') + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_has_metadata_for_topic(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + # Topics with no partitions return False + self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) + self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist')) + + # Topic with partition metadata, but no leaders return True + self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_ensure_topic_exists(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + with self.assertRaises(UnknownTopicOrPartitionError): + client.ensure_topic_exists('topic_doesnt_exist', timeout=1) + + with self.assertRaises(KafkaTimeoutError): + client.ensure_topic_exists('topic_still_creating', timeout=1) + + # This should not raise + client.ensure_topic_exists('topic_noleaders', timeout=1) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): @@ -141,70 +227,84 @@ class TestKafkaClient(unittest.TestCase): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', NO_LEADER, []) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) - topics['topic_no_partitions'] = { - 0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_one_partition', NO_ERROR, [ + PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR) + ]) + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) # calling _get_leader_for_partition (from any broker aware request) # will try loading metadata again for the same topic - leader = client._get_leader_for_partition('topic_no_partitions', 0) + leader = client._get_leader_for_partition('topic_one_partition', 0) self.assertEqual(brokers[0], leader) self.assertDictEqual({ - TopicAndPartition('topic_no_partitions', 0): brokers[0]}, + TopicAndPartition('topic_one_partition', 0): brokers[0]}, client.topics_to_brokers) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_unassigned_partitions(self, protocol, conn): - "Get leader raises if no partitions is defined for a topic" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] - topics = {'topic_no_partitions': {}} - protocol.decode_metadata_response.return_value = (brokers, topics) + topics = [ + TopicMetadata('topic_no_partitions', NO_LEADER, []), + TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({}, client.topics_to_brokers) - with self.assertRaises(PartitionUnavailableError): + with self.assertRaises(LeaderNotAvailableError): client._get_leader_for_partition('topic_no_partitions', 0) + with self.assertRaises(UnknownTopicOrPartitionError): + client._get_leader_for_partition('topic_unknown', 0) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') - def test_get_leader_returns_none_when_noleader(self, protocol, conn): - "Getting leader for partitions returns None when the partiion has no leader" + def test_get_leader_exceptions_when_noleader(self, protocol, conn): conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( @@ -213,34 +313,48 @@ class TestKafkaClient(unittest.TestCase): TopicAndPartition('topic_noleader', 1): None }, client.topics_to_brokers) - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]), - 1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0]) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + # No leader partitions -- raise LeaderNotAvailableError + with self.assertRaises(LeaderNotAvailableError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) + with self.assertRaises(LeaderNotAvailableError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) + + # Unknown partitions -- raise UnknownTopicOrPartitionError + with self.assertRaises(UnknownTopicOrPartitionError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2)) + + topics = [ + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR) + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Send producer request raises LeaderUnavailableError if leader is not available" + "Send producer request raises LeaderNotAvailableError if leader is not available" conn.recv.return_value = 'response' # anything but None - brokers = {} - brokers[0] = BrokerMetadata(0, 'broker_1', 4567) - brokers[1] = BrokerMetadata(1, 'broker_2', 5678) - - topics = {} - topics['topic_noleader'] = { - 0: PartitionMetadata('topic_noleader', 0, -1, [], []), - 1: PartitionMetadata('topic_noleader', 1, -1, [], []) - } - protocol.decode_metadata_response.return_value = (brokers, topics) + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) @@ -248,7 +362,32 @@ class TestKafkaClient(unittest.TestCase): "topic_noleader", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(LeaderUnavailableError): + with self.assertRaises(LeaderNotAvailableError): + client.send_produce_request(requests) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + requests = [ProduceRequest( + "topic_doesnt_exist", 0, + [create_message("a"), create_message("b")])] + + with self.assertRaises(UnknownTopicOrPartitionError): client.send_produce_request(requests) def test_timeout(self): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 125df2c..71fd26c 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -9,7 +9,8 @@ from kafka import ( ) from kafka.codec import has_snappy from kafka.common import ( - FetchRequest, ProduceRequest, UnknownTopicOrPartitionError + FetchRequest, ProduceRequest, + UnknownTopicOrPartitionError, LeaderNotAvailableError ) from test.fixtures import ZookeeperFixture, KafkaFixture @@ -165,7 +166,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = SimpleProducer(self.client) # At first it doesn't exist - with self.assertRaises(UnknownTopicOrPartitionError): + with self.assertRaises((UnknownTopicOrPartitionError, + LeaderNotAvailableError)): producer.send_messages(new_topic, self.msg("one")) @kafka_versions("all") diff --git a/test/test_protocol.py b/test/test_protocol.py index 11d4687..a028be9 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -10,9 +10,10 @@ from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProduceRequest, FetchRequest, Message, ChecksumError, - ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, - BrokerMetadata, PartitionMetadata, ProtocolError, - UnsupportedCodecError + ProduceResponse, FetchResponse, OffsetAndMessage, + BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, + KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, + ProtocolError ) from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, @@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase): self.assertEqual(encoded, expected) - def _create_encoded_metadata_response(self, broker_data, topic_data, - topic_errors, partition_errors): - encoded = struct.pack('>ii', 3, len(broker_data)) - for node_id, broker in six.iteritems(broker_data): - encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + def _create_encoded_metadata_response(self, brokers, topics): + encoded = struct.pack('>ii', 3, len(brokers)) + for broker in brokers: + encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId, len(broker.host), broker.host, broker.port) - encoded += struct.pack('>i', len(topic_data)) - for topic, partitions in six.iteritems(topic_data): - encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], - len(topic), topic, len(partitions)) - for partition, metadata in six.iteritems(partitions): + encoded += struct.pack('>i', len(topics)) + for topic in topics: + encoded += struct.pack('>hh%dsi' % len(topic.topic), + topic.error, len(topic.topic), + topic.topic, len(topic.partitions)) + for metadata in topic.partitions: encoded += struct.pack('>hiii', - partition_errors[(topic, partition)], - partition, metadata.leader, + metadata.error, + metadata.partition, + metadata.leader, len(metadata.replicas)) if len(metadata.replicas) > 0: encoded += struct.pack('>%di' % len(metadata.replicas), @@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase): if len(metadata.isr) > 0: encoded += struct.pack('>%di' % len(metadata.isr), *metadata.isr) - return encoded def test_decode_metadata_response(self): - node_brokers = { - 0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), - 1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), - 3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) - } - - topic_partitions = { - b"topic1": { - 0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)), - 1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1)) - }, - b"topic2": { - 0: PartitionMetadata(b"topic2", 0, 0, (), ()) - } - } - topic_errors = {b"topic1": 0, b"topic2": 1} - partition_errors = { - (b"topic1", 0): 0, - (b"topic1", 1): 1, - (b"topic2", 0): 0 - } + node_brokers = [ + BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), + BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), + BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) + ] + + topic_partitions = [ + TopicMetadata(b"topic1", 0, [ + PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0), + PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1) + ]), + TopicMetadata(b"topic2", 1, [ + PartitionMetadata(b"topic2", 0, 0, (), (), 0), + ]), + ] encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions, - topic_errors, - partition_errors) + topic_partitions) decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) |