summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py7
-rw-r--r--test/integration.py50
2 files changed, 51 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 0cde87f..ad80773 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -454,7 +454,6 @@ class KafkaClient(object):
if sent == 0:
raise RuntimeError("Kafka went away")
-
def send_multi_message_set(self, produceRequests):
"""
Send a MultiProduceRequest
@@ -550,8 +549,8 @@ class KafkaClient(object):
<offset> ::= <int64>
"""
- req = length_prefix_message(encode_offset_request(offsetRequest))
- log.debug("Sending %d bytes to Kafka", len(req))
+ req = length_prefix_message(self.encode_offset_request(offsetRequest))
+ log.debug("Sending OffsetRequest of %d bytes to Kafka", len(req))
sent = self._sock.send(req)
if sent == 0:
raise RuntimeError("Kafka went away")
@@ -574,7 +573,7 @@ class KafkaClient(object):
topic: string
payloads: strings
"""
- messages = tuple([create_message(payload) for payload in payloads])
+ messages = tuple([self.create_message(payload) for payload in payloads])
self.send_message_set(ProduceRequest(topic, -1, messages))
def iter_messages(self, topic, partition, offset, size, auto=True):
diff --git a/test/integration.py b/test/integration.py
index 03e988e..dea3f2a 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -1,4 +1,5 @@
import glob
+import logging
import os
import select
import shlex
@@ -11,7 +12,7 @@ from threading import Thread, Event
import time
import unittest
-from kafka.client import KafkaClient, ProduceRequest, FetchRequest
+from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
def get_open_port():
sock = socket.socket()
@@ -70,7 +71,7 @@ class KafkaFixture(Thread):
killed = True
if proc.poll() is not None:
- #shutil.rmtree(self.tmpDir)
+ shutil.rmtree(self.tmpDir)
if killed:
break
else:
@@ -101,6 +102,11 @@ class IntegrationTest(unittest.TestCase):
cls.kafka.close()
cls.server.shouldDie.set()
+ def test_send_simple(self):
+ self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3")
+ self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'"))
+ self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple"))
+
def test_produce(self):
# Produce a message, check that the log got created
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
@@ -164,6 +170,46 @@ class IntegrationTest(unittest.TestCase):
self.assertEquals(messages[0], message2)
self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
+ def test_iterator(self):
+ # Produce 100 messages
+ messages = []
+ for i in range(100):
+ messages.append(KafkaClient.create_message("testing %d" % i))
+ req = ProduceRequest("test-iterator", 0, messages)
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0"))
+ self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'"))
+
+ # Initialize an iterator of fetch size 64 bytes - big enough for one message
+ # but not enough for all 100 messages
+ cnt = 0
+ for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)):
+ self.assertEquals(messages[i], msg)
+ cnt += 1
+ self.assertEquals(cnt, 100)
+
+ # Same thing, but don't auto paginate
+ cnt = 0
+ for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)):
+ self.assertEquals(messages[i], msg)
+ cnt += 1
+ self.assertTrue(cnt < 100)
+
+ def test_offset_request(self):
+ # Produce a message to create the topic/partition
+ message1 = KafkaClient.create_message("testing 1")
+ req = ProduceRequest("test-offset-request", 0, [message1])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0"))
+ self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'"))
+
+ t1 = int(time.time()*1000) # now
+ t2 = t1 + 60000 # one minute from now
+ req = OffsetRequest("test-offset-request", 0, t1, 1024)
+ print self.kafka.get_offsets(req)
+
+ req = OffsetRequest("test-offset-request", 0, t2, 1024)
+ print self.kafka.get_offsets(req)
if __name__ == "__main__":
unittest.main()