summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-09 01:26:18 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-09 01:26:18 -0700
commit853d45247eebc4c43bb31e04d5a51425b59c609c (patch)
tree4414a5ed973198b14d22dee8f364e4336510b5f3 /test/test_protocol.py
parenta3c781fda5223b443ad6179b68faaf52792b158c (diff)
downloadkafka-python-853d45247eebc4c43bb31e04d5a51425b59c609c.tar.gz
Update more tests, fix intermittent failure
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py129
1 files changed, 89 insertions, 40 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 121f3d5..3b7a733 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -249,15 +249,15 @@ class TestProtocol(unittest.TestCase):
struct.pack(">i", 2), # Length of value
"v1", # Value
- struct.pack(">q", 1), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- "k2", # Key
- struct.pack(">i", 2), # Length of value
- "v2", # Value
- "@1$%(Y!", # Random padding
+ struct.pack(">q", 1), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", -16383415), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ "k2", # Key
+ struct.pack(">i", 2), # Length of value
+ "v2", # Value
+ "@1$%(Y!", # Random padding
])
msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
@@ -284,19 +284,59 @@ class TestProtocol(unittest.TestCase):
])
]
- expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07'
- 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1'
- '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00'
- '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff'
- '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00'
- '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00'
- '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01'
- '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00'
- '\x01c')
- encoded = KafkaProtocol.encode_produce_request("client1", 2, requests,
- 2, 100)
- self.assertEqual(encoded, expect)
+ msg_a_binary = KafkaProtocol._encode_message(create_message("a"))
+ msg_b_binary = KafkaProtocol._encode_message(create_message("b"))
+ msg_c_binary = KafkaProtocol._encode_message(create_message("c"))
+
+ header = "".join([
+ struct.pack('>i', 0x94), # The length of the message overall
+ struct.pack('>h', 0), # Msg Header, Message type = Produce
+ struct.pack('>h', 0), # Msg Header, API version
+ struct.pack('>i', 2), # Msg Header, Correlation ID
+ struct.pack('>h7s', 7, "client1"), # Msg Header, The client ID
+ struct.pack('>h', 2), # Num acks required
+ struct.pack('>i', 100), # Request Timeout
+ struct.pack('>i', 2), # The number of requests
+ ])
+
+ total_len = len(msg_a_binary) + len(msg_b_binary)
+ topic1 = "".join([
+ struct.pack('>h6s', 6, 'topic1'), # The topic1
+ struct.pack('>i', 1), # One message set
+ struct.pack('>i', 0), # Partition 0
+ struct.pack('>i', total_len + 24), # Size of the incoming message set
+ struct.pack('>q', 0), # No offset specified
+ struct.pack('>i', len(msg_a_binary)), # Length of message
+ msg_a_binary, # Actual message
+ struct.pack('>q', 0), # No offset specified
+ struct.pack('>i', len(msg_b_binary)), # Length of message
+ msg_b_binary, # Actual message
+ ])
+
+ topic2 = "".join([
+ struct.pack('>h6s', 6, 'topic2'), # The topic1
+ struct.pack('>i', 1), # One message set
+ struct.pack('>i', 1), # Partition 1
+ struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
+ struct.pack('>q', 0), # No offset specified
+ struct.pack('>i', len(msg_c_binary)), # Length of message
+ msg_c_binary, # Actual message
+ ])
+
+ expect1 = "".join([
+ header,
+ topic1,
+ topic2
+ ])
+
+ expect2 = "".join([
+ header,
+ topic2,
+ topic1
+ ])
+
+ encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, 2, 100)
+ self.assertIn(encoded, [ expect1, expect2 ])
def test_decode_produce_response(self):
t1 = "topic1"
@@ -314,17 +354,7 @@ class TestProtocol(unittest.TestCase):
requests = [FetchRequest("topic1", 0, 10, 1024),
FetchRequest("topic2", 1, 20, 100)]
- possibility1 = (
- '\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
- 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
- '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
- '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
- 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
- '\x00\x00\x14\x00\x00\x00d'
- )
-
- # Todo, this isn't currently different
- possibility2 = (
+ expected = (
'\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
'\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
@@ -334,7 +364,7 @@ class TestProtocol(unittest.TestCase):
)
encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
- self.assertIn(encoded, [ possibility1, possibility2 ])
+ self.assertEqual(encoded, expected)
def test_decode_fetch_response(self):
t1 = "topic1"
@@ -365,15 +395,34 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(expanded_responses, expect)
def test_encode_metadata_request_no_topics(self):
+ expected = "".join([
+ struct.pack(">i", 17), # Total length of the request
+ struct.pack('>h', 3), # API key metadata fetch
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, "cid"), # The client ID
+ struct.pack('>i', 0), # No topics, give all the data!
+ ])
+
encoded = KafkaProtocol.encode_metadata_request("cid", 4)
- self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00'
- '\x00\x04\x00\x03cid\x00\x00\x00\x00')
+
+ self.assertEqual(encoded, expected)
def test_encode_metadata_request_with_topics(self):
+ expected = "".join([
+ struct.pack(">i", 25), # Total length of the request
+ struct.pack('>h', 3), # API key metadata fetch
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, "cid"), # The client ID
+ struct.pack('>i', 2), # Number of topics in the request
+ struct.pack('>h2s', 2, "t1"), # Topic "t1"
+ struct.pack('>h2s', 2, "t2"), # Topic "t2"
+ ])
+
encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"])
- self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00'
- '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02'
- 't1\x00\x02t2')
+
+ self.assertEqual(encoded, expected)
def _create_encoded_metadata_response(self, broker_data, topic_data,
topic_errors, partition_errors):
@@ -408,6 +457,7 @@ class TestProtocol(unittest.TestCase):
1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
}
+
topic_partitions = {
"topic1": {
0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
@@ -438,7 +488,6 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_response(self):
pass
-
@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass