diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-20 11:21:12 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-25 10:01:25 +0300 |
commit | 25ad88cbe68c816cf41ae12d6d6bfc7c2a0926e8 (patch) | |
tree | f66e3234350d92b70e9f4d4039f80e8833bb243b | |
parent | 9ad0be662d388b47aadf04d712f5744add6456e3 (diff) | |
download | kafka-python-25ad88cbe68c816cf41ae12d6d6bfc7c2a0926e8.tar.gz |
Correct message keys for async batching mode
-rw-r--r-- | kafka/producer/base.py | 4 | ||||
-rw-r--r-- | kafka/protocol.py | 6 | ||||
-rw-r--r-- | test/test_producer_integration.py | 16 | ||||
-rw-r--r-- | test/test_protocol.py | 66 |
4 files changed, 76 insertions, 16 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 5b41bc9..4d56134 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -62,7 +62,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[topic_partition].append(msg) + msgset[topic_partition].append((msg, key)) # Send collected requests upstream reqs = [] @@ -191,7 +191,7 @@ class Producer(object): self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] else: - messages = create_message_set(msg, self.codec, key) + messages = create_message_set([(m, key) for m in msg], self.codec, key) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, diff --git a/kafka/protocol.py b/kafka/protocol.py index 2a39de6..b34a95d 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None): """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload, key) for payload in payloads]) + [create_message(payload, pl_key) for payload, pl_key in payloads]) gzipped = gzip_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP @@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None): """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload, key) for payload in payloads]) + [create_message(payload, pl_key) for payload, pl_key in payloads]) snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY @@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None): return a list containing a single codec-encoded message. """ if codec == CODEC_NONE: - return [create_message(m, key) for m in messages] + return [create_message(m, k) for m, k in messages] elif codec == CODEC_GZIP: return [create_gzip_message(messages, key)] elif codec == CODEC_SNAPPY: diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 38df69f..1804af0 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -71,9 +71,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) message1 = create_gzip_message([ - ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)]) + (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)]) message2 = create_gzip_message([ - ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)]) + (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)]) self.assert_produce_request( [ message1, message2 ], @@ -87,8 +87,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ - create_snappy_message(["Snappy 1 %d" % i for i in range(100)]), - create_snappy_message(["Snappy 2 %d" % i for i in range(100)]), + create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]), + create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]), ], start_offset, 200, @@ -102,13 +102,13 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): messages = [ create_message(b"Just a plain message"), create_gzip_message([ - ("Gzipped %d" % i).encode('utf-8') for i in range(100)]), + (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]), ] # All snappy integration tests fail with nosnappyjava if False and has_snappy(): msg_count += 100 - messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)])) + messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)])) self.assert_produce_request(messages, start_offset, msg_count) @@ -118,7 +118,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request([ create_gzip_message([ - ("Gzipped batch 1, message %d" % i).encode('utf-8') + (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) for i in range(50000)]) ], start_offset, @@ -127,7 +127,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request([ create_gzip_message([ - ("Gzipped batch 1, message %d" % i).encode('utf-8') + (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) for i in range(50000)]) ], start_offset+50000, diff --git a/test/test_protocol.py b/test/test_protocol.py index d20f591..0938228 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -32,7 +32,7 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.value, payload) def test_create_gzip(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) @@ -59,9 +59,39 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + def test_create_gzip_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) @@ -87,6 +117,36 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_create_snappy_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) + self.assertEqual(msg.key, None) + decoded = snappy_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + def test_encode_message_header(self): expect = b"".join([ struct.pack(">h", 10), # API Key @@ -701,7 +761,7 @@ class TestProtocol(unittest.TestCase): yield def test_create_message_set(self): - messages = [1, 2, 3] + messages = [(1, "k1"), (2, "k2"), (3, "k3")] # Default codec is CODEC_NONE. Expect list of regular messages. expect = [sentinel.message] * len(messages) |