diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-20 11:21:12 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-25 10:01:25 +0300 |
commit | 25ad88cbe68c816cf41ae12d6d6bfc7c2a0926e8 (patch) | |
tree | f66e3234350d92b70e9f4d4039f80e8833bb243b /test/test_protocol.py | |
parent | 9ad0be662d388b47aadf04d712f5744add6456e3 (diff) | |
download | kafka-python-25ad88cbe68c816cf41ae12d6d6bfc7c2a0926e8.tar.gz |
Correct message keys for async batching mode
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r-- | test/test_protocol.py | 66 |
1 files changed, 63 insertions, 3 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index d20f591..0938228 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -32,7 +32,7 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.value, payload) def test_create_gzip(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) @@ -59,9 +59,39 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + def test_create_gzip_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) @@ -87,6 +117,36 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_create_snappy_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) + self.assertEqual(msg.key, None) + decoded = snappy_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + def test_encode_message_header(self): expect = b"".join([ struct.pack(">h", 10), # API Key @@ -701,7 +761,7 @@ class TestProtocol(unittest.TestCase): yield def test_create_message_set(self): - messages = [1, 2, 3] + messages = [(1, "k1"), (2, "k2"), (3, "k3")] # Default codec is CODEC_NONE. Expect list of regular messages. expect = [sentinel.message] * len(messages) |