diff options
Diffstat (limited to 'test/test_client.py')
-rw-r--r-- | test/test_client.py | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/test/test_client.py b/test/test_client.py index abda421..bab7916 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -117,21 +117,21 @@ class TestKafkaClient(unittest.TestCase): ] topics = [ - TopicMetadata('topic_1', NO_ERROR, [ - PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR) + TopicMetadata(b'topic_1', NO_ERROR, [ + PartitionMetadata(b'topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR) ]), - TopicMetadata('topic_noleader', NO_ERROR, [ - PartitionMetadata('topic_noleader', 0, -1, [], [], + TopicMetadata(b'topic_noleader', NO_ERROR, [ + PartitionMetadata(b'topic_noleader', 0, -1, [], [], NO_LEADER), - PartitionMetadata('topic_noleader', 1, -1, [], [], + PartitionMetadata(b'topic_noleader', 1, -1, [], [], NO_LEADER), ]), - TopicMetadata('topic_no_partitions', NO_LEADER, []), - TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), - TopicMetadata('topic_3', NO_ERROR, [ - PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR), - PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR), - PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR) + TopicMetadata(b'topic_no_partitions', NO_LEADER, []), + TopicMetadata(b'topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata(b'topic_3', NO_ERROR, [ + PartitionMetadata(b'topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata(b'topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR), + PartitionMetadata(b'topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR) ]) ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -139,12 +139,12 @@ class TestKafkaClient(unittest.TestCase): # client loads metadata at init client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({ - TopicAndPartition('topic_1', 0): brokers[1], - TopicAndPartition('topic_noleader', 0): None, - TopicAndPartition('topic_noleader', 1): None, - TopicAndPartition('topic_3', 0): brokers[0], - TopicAndPartition('topic_3', 1): brokers[1], - TopicAndPartition('topic_3', 2): brokers[0]}, + TopicAndPartition(b'topic_1', 0): brokers[1], + TopicAndPartition(b'topic_noleader', 0): None, + TopicAndPartition(b'topic_noleader', 1): None, + TopicAndPartition(b'topic_3', 0): brokers[0], + TopicAndPartition(b'topic_3', 1): brokers[1], + TopicAndPartition(b'topic_3', 2): brokers[0]}, client.topics_to_brokers) # if we ask for metadata explicitly, it should raise errors @@ -156,6 +156,7 @@ class TestKafkaClient(unittest.TestCase): # This should not raise client.load_metadata_for_topics('topic_no_leader') + client.load_metadata_for_topics(b'topic_no_leader') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -169,11 +170,11 @@ class TestKafkaClient(unittest.TestCase): ] topics = [ - TopicMetadata('topic_still_creating', NO_LEADER, []), - TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), - TopicMetadata('topic_noleaders', NO_ERROR, [ - PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), - PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + TopicMetadata(b'topic_still_creating', NO_LEADER, []), + TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata(b'topic_noleaders', NO_ERROR, [ + PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER), ]), ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -188,8 +189,8 @@ class TestKafkaClient(unittest.TestCase): self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) @patch('kafka.client.KafkaConnection') - @patch('kafka.client.KafkaProtocol') - def test_ensure_topic_exists(self, protocol, conn): + @patch('kafka.client.KafkaProtocol.decode_metadata_response') + def test_ensure_topic_exists(self, decode_metadata_response, conn): conn.recv.return_value = 'response' # anything but None @@ -199,14 +200,14 @@ class TestKafkaClient(unittest.TestCase): ] topics = [ - TopicMetadata('topic_still_creating', NO_LEADER, []), - TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), - TopicMetadata('topic_noleaders', NO_ERROR, [ - PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), - PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + TopicMetadata(b'topic_still_creating', NO_LEADER, []), + TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata(b'topic_noleaders', NO_ERROR, [ + PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER), ]), ] - protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + decode_metadata_response.return_value = MetadataResponse(brokers, topics) client = KafkaClient(hosts=['broker_1:4567']) @@ -218,6 +219,7 @@ class TestKafkaClient(unittest.TestCase): # This should not raise client.ensure_topic_exists('topic_noleaders', timeout=1) + client.ensure_topic_exists(b'topic_noleaders', timeout=1) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -269,8 +271,8 @@ class TestKafkaClient(unittest.TestCase): ] topics = [ - TopicMetadata('topic_no_partitions', NO_LEADER, []), - TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata(b'topic_no_partitions', NO_LEADER, []), + TopicMetadata(b'topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -279,10 +281,10 @@ class TestKafkaClient(unittest.TestCase): self.assertDictEqual({}, client.topics_to_brokers) with self.assertRaises(LeaderNotAvailableError): - client._get_leader_for_partition('topic_no_partitions', 0) + client._get_leader_for_partition(b'topic_no_partitions', 0) with self.assertRaises(UnknownTopicOrPartitionError): - client._get_leader_for_partition('topic_unknown', 0) + client._get_leader_for_partition(b'topic_unknown', 0) @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') |