diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
commit | 19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e (patch) | |
tree | 421d3e2d628e8b564eecde6a4efcd4edac31d1ff /test/test_unit.py | |
parent | 828133cff064f4f8fba753183ac21619355ac005 (diff) | |
parent | 32edabdaaff6746e4926cc897b4bba917a80cb54 (diff) | |
download | kafka-python-19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e.tar.gz |
Merge branch 'master' into develop
Conflicts:
test/test_unit.py
Diffstat (limited to 'test/test_unit.py')
-rw-r--r-- | test/test_unit.py | 151 |
1 files changed, 134 insertions, 17 deletions
diff --git a/test/test_unit.py b/test/test_unit.py index 0b2b339..47ec561 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,11 +3,15 @@ import random import struct import unittest +from mock import MagicMock, patch + +from kafka import KafkaClient from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, - TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError + TopicAndPartition, KafkaUnavailableError, + LeaderUnavailableError, PartitionUnavailableError ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -17,10 +21,6 @@ from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) -from kafka.client import KafkaClient - -from mock import patch - ITERATIONS = 1000 STRLEN = 100 @@ -55,6 +55,7 @@ class TestPackage(unittest.TestCase): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") + from kafka.codec import snappy_encode self.assertEquals(snappy_encode.__name__, "snappy_encode") @@ -74,6 +75,49 @@ class TestCodec(unittest.TestCase): s2 = snappy_decode(snappy_encode(s1)) self.assertEquals(s1, s2) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_detect_xerial(self): + import kafka as kafka1 + _detect_xerial_stream = kafka1.codec._detect_xerial_stream + + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + short_data = b'\x01\x02\x03\x04' + + self.assertTrue(_detect_xerial_stream(header)) + self.assertFalse(_detect_xerial_stream(b'')) + self.assertFalse(_detect_xerial_stream(b'\x00')) + self.assertFalse(_detect_xerial_stream(false_header)) + self.assertFalse(_detect_xerial_stream(random_snappy)) + self.assertFalse(_detect_xerial_stream(short_data)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_decode_xerial(self): + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + block_len = len(random_snappy) + random_snappy2 = snappy_encode('XERIAL' * 50) + block_len2 = len(random_snappy2) + + to_test = header \ + + struct.pack('!i', block_len) + random_snappy \ + + struct.pack('!i', block_len2) + random_snappy2 \ + + self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_encode_xerial(self): + to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + + to_test = ('SNAPPY' * 50) + ('XERIAL' * 50) + + compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) + self.assertEquals(compressed, to_ensure) class TestProtocol(unittest.TestCase): @@ -91,9 +135,8 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual( - msg.attributes, - KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_GZIP) self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) @@ -108,9 +151,8 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual( - msg.attributes, - KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_SNAPPY) self.assertEqual(msg.key, None) expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" @@ -281,7 +323,6 @@ class TestProtocol(unittest.TestCase): len(ms3), ms3) responses = list(KafkaProtocol.decode_fetch_response(encoded)) - def expand_messages(response): return FetchResponse(response.topic, response.partition, response.error, response.highwaterMark, @@ -369,6 +410,7 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass + @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -388,6 +430,81 @@ class TestProtocol(unittest.TestCase): class TestKafkaClient(unittest.TestCase): + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) + + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call works when at least one of the host is available' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_load_metadata(self, protocol, conn): @@ -416,7 +533,7 @@ class TestKafkaClient(unittest.TestCase): protocol.decode_metadata_response.return_value = (brokers, topics) # client loads metadata at init - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({ TopicAndPartition('topic_1', 0): brokers[1], TopicAndPartition('topic_noleader', 0): None, @@ -440,7 +557,7 @@ class TestKafkaClient(unittest.TestCase): topics = {'topic_no_partitions': {}} protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) # topic metadata is loaded but empty self.assertDictEqual({}, client.topics_to_brokers) @@ -473,7 +590,7 @@ class TestKafkaClient(unittest.TestCase): topics = {'topic_no_partitions': {}} protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual({}, client.topics_to_brokers) self.assertRaises( @@ -499,7 +616,7 @@ class TestKafkaClient(unittest.TestCase): } protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) self.assertDictEqual( { TopicAndPartition('topic_noleader', 0): None, @@ -535,7 +652,7 @@ class TestKafkaClient(unittest.TestCase): } protocol.decode_metadata_response.return_value = (brokers, topics) - client = KafkaClient(host='broker_1', port=4567) + client = KafkaClient(hosts=['broker_1:4567']) requests = [ProduceRequest( "topic_noleader", 0, |