summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-02-20 11:21:12 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-02-25 10:01:25 +0300
commit25ad88cbe68c816cf41ae12d6d6bfc7c2a0926e8 (patch)
treef66e3234350d92b70e9f4d4039f80e8833bb243b /test/test_protocol.py
parent9ad0be662d388b47aadf04d712f5744add6456e3 (diff)
downloadkafka-python-25ad88cbe68c816cf41ae12d6d6bfc7c2a0926e8.tar.gz
Correct message keys for async batching mode
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py66
1 files changed, 63 insertions, 3 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index d20f591..0938228 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -32,7 +32,7 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.value, payload)
def test_create_gzip(self):
- payloads = [b"v1", b"v2"]
+ payloads = [(b"v1", None), (b"v2", None)]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
@@ -59,9 +59,39 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(decoded, expect)
+ def test_create_gzip_keyed(self):
+ payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
+ msg = create_gzip_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
+ self.assertEqual(msg.key, None)
+ # Need to decode to check since gzipped payload is non-deterministic
+ decoded = gzip_decode(msg.value)
+ expect = b"".join([
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", 1474775406), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k1", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v1", # Value
+
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", -16383415), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k2", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v2", # Value
+ ])
+
+ self.assertEqual(decoded, expect)
+
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
- payloads = [b"v1", b"v2"]
+ payloads = [(b"v1", None), (b"v2", None)]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
@@ -87,6 +117,36 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(decoded, expect)
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_create_snappy_keyed(self):
+ payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
+ msg = create_snappy_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
+ self.assertEqual(msg.key, None)
+ decoded = snappy_decode(msg.value)
+ expect = b"".join([
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", 1474775406), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k1", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v1", # Value
+
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", -16383415), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k2", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v2", # Value
+ ])
+
+ self.assertEqual(decoded, expect)
+
def test_encode_message_header(self):
expect = b"".join([
struct.pack(">h", 10), # API Key
@@ -701,7 +761,7 @@ class TestProtocol(unittest.TestCase):
yield
def test_create_message_set(self):
- messages = [1, 2, 3]
+ messages = [(1, "k1"), (2, "k2"), (3, "k3")]
# Default codec is CODEC_NONE. Expect list of regular messages.
expect = [sentinel.message] * len(messages)