diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-09 01:26:18 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-09 01:26:18 -0700 |
commit | 853d45247eebc4c43bb31e04d5a51425b59c609c (patch) | |
tree | 4414a5ed973198b14d22dee8f364e4336510b5f3 /test/test_protocol.py | |
parent | a3c781fda5223b443ad6179b68faaf52792b158c (diff) | |
download | kafka-python-853d45247eebc4c43bb31e04d5a51425b59c609c.tar.gz |
Update more tests, fix intermittent failure
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 129 |
1 files changed, 89 insertions, 40 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 121f3d5..3b7a733 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -249,15 +249,15 @@ class TestProtocol(unittest.TestCase): struct.pack(">i", 2), # Length of value "v1", # Value - struct.pack(">q", 1), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - "k2", # Key - struct.pack(">i", 2), # Length of value - "v2", # Value - "@1$%(Y!", # Random padding + struct.pack(">q", 1), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + "k2", # Key + struct.pack(">i", 2), # Length of value + "v2", # Value + "@1$%(Y!", # Random padding ]) msgs = list(KafkaProtocol._decode_message_set_iter(encoded)) @@ -284,19 +284,59 @@ class TestProtocol(unittest.TestCase): ]) ] - expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07' - 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1' - '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00' - '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff' - '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00' - '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00' - '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01' - '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00' - '\x01c') - encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, - 2, 100) - self.assertEqual(encoded, expect) + msg_a_binary = KafkaProtocol._encode_message(create_message("a")) + msg_b_binary = KafkaProtocol._encode_message(create_message("b")) + msg_c_binary = KafkaProtocol._encode_message(create_message("c")) + + header = "".join([ + struct.pack('>i', 0x94), # The length of the message overall + struct.pack('>h', 0), # Msg Header, Message type = Produce + struct.pack('>h', 0), # Msg Header, API version + struct.pack('>i', 2), # Msg Header, Correlation ID + struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID + struct.pack('>h', 2), # Num acks required + struct.pack('>i', 100), # Request Timeout + struct.pack('>i', 2), # The number of requests + ]) + + total_len = len(msg_a_binary) + len(msg_b_binary) + topic1 = "".join([ + struct.pack('>h6s', 6, 'topic1'), # The topic1 + struct.pack('>i', 1), # One message set + struct.pack('>i', 0), # Partition 0 + struct.pack('>i', total_len + 24), # Size of the incoming message set + struct.pack('>q', 0), # No offset specified + struct.pack('>i', len(msg_a_binary)), # Length of message + msg_a_binary, # Actual message + struct.pack('>q', 0), # No offset specified + struct.pack('>i', len(msg_b_binary)), # Length of message + msg_b_binary, # Actual message + ]) + + topic2 = "".join([ + struct.pack('>h6s', 6, 'topic2'), # The topic1 + struct.pack('>i', 1), # One message set + struct.pack('>i', 1), # Partition 1 + struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set + struct.pack('>q', 0), # No offset specified + struct.pack('>i', len(msg_c_binary)), # Length of message + msg_c_binary, # Actual message + ]) + + expect1 = "".join([ + header, + topic1, + topic2 + ]) + + expect2 = "".join([ + header, + topic2, + topic1 + ]) + + encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100) + self.assertIn(encoded, [ expect1, expect2 ]) def test_decode_produce_response(self): t1 = "topic1" @@ -314,17 +354,7 @@ class TestProtocol(unittest.TestCase): requests = [FetchRequest("topic1", 0, 10, 1024), FetchRequest("topic2", 1, 20, 100)] - possibility1 = ( - '\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07' - 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00' - '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00' - '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06' - 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00' - '\x00\x00\x14\x00\x00\x00d' - ) - - # Todo, this isn't currently different - possibility2 = ( + expected = ( '\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07' 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00' '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00' @@ -334,7 +364,7 @@ class TestProtocol(unittest.TestCase): ) encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100) - self.assertIn(encoded, [ possibility1, possibility2 ]) + self.assertEqual(encoded, expected) def test_decode_fetch_response(self): t1 = "topic1" @@ -365,15 +395,34 @@ class TestProtocol(unittest.TestCase): self.assertEqual(expanded_responses, expect) def test_encode_metadata_request_no_topics(self): + expected = "".join([ + struct.pack(">i", 17), # Total length of the request + struct.pack('>h', 3), # API key metadata fetch + struct.pack('>h', 0), # API version + struct.pack('>i', 4), # Correlation ID + struct.pack('>h3s', 3, "cid"), # The client ID + struct.pack('>i', 0), # No topics, give all the data! + ]) + encoded = KafkaProtocol.encode_metadata_request("cid", 4) - self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00' - '\x00\x04\x00\x03cid\x00\x00\x00\x00') + + self.assertEqual(encoded, expected) def test_encode_metadata_request_with_topics(self): + expected = "".join([ + struct.pack(">i", 25), # Total length of the request + struct.pack('>h', 3), # API key metadata fetch + struct.pack('>h', 0), # API version + struct.pack('>i', 4), # Correlation ID + struct.pack('>h3s', 3, "cid"), # The client ID + struct.pack('>i', 2), # Number of topics in the request + struct.pack('>h2s', 2, "t1"), # Topic "t1" + struct.pack('>h2s', 2, "t2"), # Topic "t2" + ]) + encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"]) - self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00' - '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02' - 't1\x00\x02t2') + + self.assertEqual(encoded, expected) def _create_encoded_metadata_response(self, broker_data, topic_data, topic_errors, partition_errors): @@ -408,6 +457,7 @@ class TestProtocol(unittest.TestCase): 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) } + topic_partitions = { "topic1": { 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), @@ -438,7 +488,6 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass |