summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka.py2
-rw-r--r--test.py15
2 files changed, 15 insertions, 2 deletions
diff --git a/kafka.py b/kafka.py
index 902a71e..80abed3 100644
--- a/kafka.py
+++ b/kafka.py
@@ -201,7 +201,7 @@ class KafkaClient(object):
The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet
"""
(topic, partition, messages) = produceRequest
- message_set = self.encode_message_set(messages)
+ message_set = cls.encode_message_set(messages)
log.debug("Sending MessageSet: %r" % message_set)
req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)),
KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set)
diff --git a/test.py b/test.py
index 56f1e1e..5c48f27 100644
--- a/test.py
+++ b/test.py
@@ -3,7 +3,7 @@ import random
import struct
import unittest
-from kafka import KafkaClient
+from kafka import KafkaClient, ProduceRequest, FetchRequest
from kafka import gzip_encode, gzip_decode, length_prefix_message
ITERATIONS = 1000
@@ -102,5 +102,18 @@ class TestMessage(unittest.TestCase):
for j in range(n):
self.assertEquals(messages[j].payload, strings[j])
+class TestRequests(unittest.TestCase):
+ def test_produce_request(self):
+ req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
+ enc = KafkaClient.encode_produce_request(req)
+ expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
+ self.assertEquals(enc, expect)
+
+ def test_fetch_request(self):
+ req = FetchRequest("my-topic", 0, 0, 1024)
+ enc = KafkaClient.encode_fetch_request(req)
+ expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00"
+ self.assertEquals(enc, expect)
+
if __name__ == '__main__':
unittest.main()