diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-02 18:16:41 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:50 -0800 |
commit | b6b1ba09402dbe7fe13b9c1e806faa02db42ebdb (patch) | |
tree | cc6b63f8e3cf2811cf332cf878e868e0b3ee6856 /test/test_unit.py | |
parent | 009ed92894b03a95a473359e64c5499665697b10 (diff) | |
download | kafka-python-b6b1ba09402dbe7fe13b9c1e806faa02db42ebdb.tar.gz |
Fix unit tests.
This is pretty much a rewrite. The tests that involve offset requests/responses
are not implemented since that API is not supported in kafka 0.8 yet.
Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
Diffstat (limited to 'test/test_unit.py')
-rw-r--r-- | test/test_unit.py | 464 |
1 files changed, 314 insertions, 150 deletions
diff --git a/test/test_unit.py b/test/test_unit.py index 3f3af66..93d88a1 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,13 +3,22 @@ import random import struct import unittest -from kafka.client import KafkaClient -from kafka.common import ProduceRequest, FetchRequest +from kafka.common import ( + ProduceRequest, FetchRequest, Message, ChecksumError, + ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, + OffsetAndMessage, BrokerMetadata, PartitionMetadata +) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.protocol import ( + create_gzip_message, + create_message, + create_snappy_message, + KafkaProtocol +) ITERATIONS = 1000 STRLEN = 100 @@ -20,16 +29,13 @@ def random_string(): class TestPackage(unittest.TestCase): - @unittest.expectedFailure + def test_top_level_namespace(self): import kafka as kafka1 self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") - self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode") - self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode") self.assertEquals(kafka1.client.__name__, "kafka.client") self.assertEquals(kafka1.codec.__name__, "kafka.codec") - @unittest.expectedFailure def test_submodule_namespace(self): import kafka.client as client1 self.assertEquals(client1.__name__, "kafka.client") @@ -48,19 +54,8 @@ class TestPackage(unittest.TestCase): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") - from kafka import gzip_encode as gzip_encode2 - self.assertEquals(gzip_encode2.__name__, "gzip_encode") - - from kafka import snappy_encode as snappy_encode2 - self.assertEquals(snappy_encode2.__name__, "snappy_encode") - - -class TestMisc(unittest.TestCase): - @unittest.expectedFailure - def test_length_prefix(self): - for i in xrange(ITERATIONS): - s1 = random_string() - self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) + from kafka.codec import snappy_encode + self.assertEquals(snappy_encode.__name__, "snappy_encode") class TestCodec(unittest.TestCase): @@ -81,140 +76,309 @@ class TestCodec(unittest.TestCase): self.assertEquals(s1, s2) -# XXX(sandello): These really should be protocol tests. -class TestMessage(unittest.TestCase): - @unittest.expectedFailure - def test_create(self): - msg = KafkaClient.create_message("testing") - self.assertEquals(msg.payload, "testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 0) - self.assertEquals(msg.crc, -386704890) +class TestProtocol(unittest.TestCase): + + def test_create_message(self): + payload = "test" + key = "key" + msg = create_message(payload, key) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, 0) + self.assertEqual(msg.key, key) + self.assertEqual(msg.value, payload) - @unittest.expectedFailure def test_create_gzip(self): - msg = KafkaClient.create_gzip_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 1) - # Can't check the crc or payload for gzip since it's non-deterministic - (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure + payloads = ["v1", "v2"] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2" + "\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff\xff" + "\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(decoded, expect) + def test_create_snappy(self): - msg = KafkaClient.create_snappy_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 2) - self.assertEquals(msg.crc, -62350868) - (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure - def test_message_simple(self): - msg = KafkaClient.create_message("testing") - enc = KafkaClient.encode_message(msg) - expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], msg) - - @unittest.expectedFailure - def test_message_list(self): - msgs = [ - KafkaClient.create_message("one"), - KafkaClient.create_message("two"), - KafkaClient.create_message("three") - ] - enc = KafkaClient.encode_message_set(msgs) - expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11" - "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three") - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_gzip(self): - msg = KafkaClient.create_gzip_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - # Can't check the bytes directly since Gzip is non-deterministic - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_snappy(self): - msg = KafkaClient.create_snappy_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_simple_random(self): - for i in xrange(ITERATIONS): - n = random.randint(0, 10) - msgs = [KafkaClient.create_message(random_string()) for j in range(n)] - enc = KafkaClient.encode_message_set(msgs) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j], msgs[j]) - - @unittest.expectedFailure - def test_message_gzip_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_gzip_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - @unittest.expectedFailure - def test_message_snappy_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_snappy_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - -class TestRequests(unittest.TestCase): - @unittest.expectedFailure - def test_produce_request(self): - req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) - enc = KafkaClient.encode_produce_request(req) - expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - - @unittest.expectedFailure - def test_fetch_request(self): - req = FetchRequest("my-topic", 0, 0, 1024) - enc = KafkaClient.encode_fetch_request(req) - expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00" - self.assertEquals(enc, expect) + payloads = ["v1", "v2"] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_SNAPPY) + self.assertEqual(msg.key, None) + expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" + "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" + "\xff\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(msg.value, expect) + + def test_encode_message_header(self): + expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3' + encoded = KafkaProtocol._encode_message_header("client3", 4, 10) + self.assertEqual(encoded, expect) + + def test_encode_message(self): + message = create_message("test", "key") + encoded = KafkaProtocol._encode_message(message) + expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + self.assertEqual(encoded, expect) + + def test_encode_message_failure(self): + self.assertRaises(Exception, KafkaProtocol._encode_message, + Message(1, 0, "key", "test")) + + def test_encode_message_set(self): + message_set = [create_message("v1", "k1"), create_message("v2", "k2")] + encoded = KafkaProtocol._encode_message_set(message_set) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00" + "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00" + "\x00\x00\x02k2\x00\x00\x00\x02v2") + self.assertEqual(encoded, expect) + + def test_decode_message(self): + encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + offset = 10 + (returned_offset, decoded_message) = \ + list(KafkaProtocol._decode_message(encoded, offset))[0] + self.assertEqual(returned_offset, offset) + self.assertEqual(decoded_message, create_message("test", "key")) + + def test_decode_message_set(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded) + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_gzip(self): + gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' + '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' + '\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8' + '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00' + '\x00') + offset = 11 + decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_snappy(self): + snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' + '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5' + '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2') + offset = 11 + decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_checksum_error(self): + invalid_encoded_message = "This is not a valid encoded message" + iter = KafkaProtocol._decode_message(invalid_encoded_message, 0) + self.assertRaises(ChecksumError, list, iter) + + # NOTE: The error handling in _decode_message_set_iter() is questionable. + # If it's modified, the next two tests might need to be fixed. + def test_decode_message_set_fetch_size_too_small(self): + iter = KafkaProtocol._decode_message_set_iter('a') + self.assertRaises(ConsumerFetchSizeTooSmall, list, iter) + + def test_decode_message_set_stop_iteration(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded + "@#$%(Y!") + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_encode_produce_request(self): + requests = [ProduceRequest("topic1", 0, [create_message("a"), + create_message("b")]), + ProduceRequest("topic2", 1, [create_message("c")])] + expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07' + 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1' + '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff' + '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00' + '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01' + '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00' + '\x01c') + encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, + 2, 100) + self.assertEqual(encoded, expect) + + def test_decode_produce_response(self): + t1 = "topic1" + t2 = "topic2" + encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), + 2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L, + len(t2), t2, 1, 0, 0, 30L) + responses = list(KafkaProtocol.decode_produce_response(encoded)) + self.assertEqual(responses, + [ProduceResponse(t1, 0, 0, 10L), + ProduceResponse(t1, 1, 1, 20L), + ProduceResponse(t2, 0, 0, 30L)]) + + def test_encode_fetch_request(self): + requests = [FetchRequest("topic1", 0, 10, 1024), + FetchRequest("topic2", 1, 20, 100)] + expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07' + 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00' + '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06' + 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00' + '\x00\x00\x14\x00\x00\x00d') + encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, + 100) + self.assertEqual(encoded, expect) + + def test_decode_fetch_response(self): + t1 = "topic1" + t2 = "topic2" + msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"]) + ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]]) + ms2 = KafkaProtocol._encode_message_set([msgs[2]]) + ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]]) + + encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % + (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), + 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1, + 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30, + len(ms3), ms3) + + responses = list(KafkaProtocol.decode_fetch_response(encoded)) + def expand_messages(response): + return FetchResponse(response.topic, response.partition, + response.error, response.highwaterMark, + list(response.messages)) + + expanded_responses = map(expand_messages, responses) + expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), + OffsetAndMessage(0, msgs[1])]), + FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), + FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), + OffsetAndMessage(0, msgs[4])])] + self.assertEqual(expanded_responses, expect) + + def test_encode_metadata_request_no_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4) + self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x00') + + def test_encode_metadata_request_with_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"]) + self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02' + 't1\x00\x02t2') + + def _create_encoded_metadata_response(self, broker_data, topic_data, + topic_errors, partition_errors): + encoded = struct.pack('>ii', 3, len(broker_data)) + for node_id, broker in broker_data.iteritems(): + encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + len(broker.host), broker.host, broker.port) + + encoded += struct.pack('>i', len(topic_data)) + for topic, partitions in topic_data.iteritems(): + encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], + len(topic), topic, len(partitions)) + for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>hiii', + partition_errors[(topic, partition)], + partition, metadata.leader, + len(metadata.replicas)) + if len(metadata.replicas) > 0: + encoded += struct.pack('>%di' % len(metadata.replicas), + *metadata.replicas) + + encoded += struct.pack('>i', len(metadata.isr)) + if len(metadata.isr) > 0: + encoded += struct.pack('>%di' % len(metadata.isr), + *metadata.isr) + + return encoded + + def test_decode_metadata_response(self): + node_brokers = { + 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + } + topic_partitions = { + "topic1": { + 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), + 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) + }, + "topic2": { + 0: PartitionMetadata("topic2", 0, 0, (), ()) + } + } + topic_errors = {"topic1": 0, "topic2": 1} + partition_errors = { + ("topic1", 0): 0, + ("topic1", 1): 1, + ("topic2", 0): 0 + } + encoded = self._create_encoded_metadata_response(node_brokers, + topic_partitions, + topic_errors, + partition_errors) + decoded = KafkaProtocol.decode_metadata_response(encoded) + self.assertEqual(decoded, (node_brokers, topic_partitions)) + + @unittest.skip("Not Implemented") + def test_encode_offset_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_response(self): + pass + + + @unittest.skip("Not Implemented") + def test_encode_offset_commit_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_commit_response(self): + pass + + @unittest.skip("Not Implemented") + def test_encode_offset_fetch_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_fetch_response(self): + pass if __name__ == '__main__': |