summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-02-22 23:09:25 -0500
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit2a3d231aa61642c57537bc2128dd4f2bd30f35dd (patch)
tree6bfdfa13b228481df9c79bcb926c2036b476b891 /test/integration.py
parente87c561723be25fcfa2564030367196231aa366e (diff)
downloadkafka-python-2a3d231aa61642c57537bc2128dd4f2bd30f35dd.tar.gz
Protocol and low-level client done, adding tests
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py407
1 files changed, 245 insertions, 162 deletions
diff --git a/test/integration.py b/test/integration.py
index 3971d3f..598b17a 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -11,8 +11,7 @@ from threading import Thread, Event
import time
import unittest
-from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
-from kafka.queue import KafkaQueue
+from kafka.client08 import *
def get_open_port():
sock = socket.socket()
@@ -27,12 +26,15 @@ def build_kafka_classpath():
jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar"))
jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar"))
jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar"))
- jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar"))
jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar"))
- return ":".join(["."] + [os.path.abspath(jar) for jar in jars])
+ jars += glob.glob(os.path.join(baseDir, "core/target/scala-2.8.0/kafka_2.8.0-*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "/Users/mumrah/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar"))
+ cp = ":".join(["."] + [os.path.abspath(jar) for jar in jars])
+ cp += ":" + os.path.abspath(os.path.join(baseDir, "conf/log4j.properties"))
+ return cp
class KafkaFixture(Thread):
- def __init__(self, port):
+ def __init__(self, host, port):
Thread.__init__(self)
self.port = port
self.capture = ""
@@ -57,7 +59,7 @@ class KafkaFixture(Thread):
# Start Kafka
args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile))
- proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
+ proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
killed = False
while True:
@@ -65,6 +67,7 @@ class KafkaFixture(Thread):
if proc.stdout in rlist:
read = proc.stdout.readline()
stdout.write(read)
+ stdout.flush()
self.capture += read
if self.shouldDie.is_set():
@@ -88,174 +91,254 @@ class KafkaFixture(Thread):
return True
time.sleep(0.100)
+ def close(self):
+ self.shouldDie.set()
-class IntegrationTest(unittest.TestCase):
+class ExternalKafkaFixture(object):
+ def __init__(self, host, port):
+ print("Using already running Kafka at %s:%d" % (host, port))
+
+ def close(self):
+ pass
+
+
+class TestKafkaClient(unittest.TestCase):
@classmethod
def setUpClass(cls):
- port = get_open_port()
- cls.server = KafkaFixture(port)
- cls.server.start()
- cls.server.wait_for("Kafka server started")
- cls.kafka = KafkaClient("localhost", port)
+ if os.environ.has_key('KAFKA_URI'):
+ parse = urlparse(os.environ['KAFKA_URI'])
+ (host, port) = (parse.hostname, parse.port)
+ cls.server = ExternalKafkaFixture(host, port)
+ cls.client = KafkaClient(host, port)
+ else:
+ port = get_open_port()
+ cls.server = KafkaFixture("localhost", port)
+ cls.server.start()
+ cls.server.wait_for("Kafka server started")
+ cls.client = KafkaClient("localhost", port)
@classmethod
def tearDownClass(cls):
- cls.kafka.close()
- cls.server.shouldDie.set()
-
- def test_send_simple(self):
- self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3")
- self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple"))
-
- def test_produce(self):
- # Produce a message, check that the log got created
- req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
-
- # Same thing, different partition
- req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
-
- def _test_produce_consume(self, topic, create_func):
- # Send two messages and consume them
- message1 = create_func("testing 1")
- message2 = create_func("testing 2")
- req = ProduceRequest(topic, 0, [message1, message2])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic))
- self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic))
- req = FetchRequest(topic, 0, 0, 1024)
- (messages, req) = self.kafka.get_message_set(req)
- self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0].payload, "testing 1")
- self.assertEquals(messages[1].payload, "testing 2")
-
- # Do the same, but for a different partition
- message3 = create_func("testing 3")
- message4 = create_func("testing 4")
- req = ProduceRequest(topic, 1, [message3, message4])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic))
- self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic))
- req = FetchRequest(topic, 1, 0, 1024)
- (messages, req) = self.kafka.get_message_set(req)
- self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0].payload, "testing 3")
- self.assertEquals(messages[1].payload, "testing 4")
+ cls.client.close()
+ cls.server.close()
+
+ #####################
+ # Produce Tests #
+ #####################
+
+ def test_produce_many_simple(self):
+ produce = ProduceRequest("test_produce_many_simple", 0, messages=[
+ KafkaProtocol.create_message("Test message %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 100)
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 200)
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 200)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 300)
+
+ def test_produce_10k_simple(self):
+ produce = ProduceRequest("test_produce_10k_simple", 0, messages=[
+ KafkaProtocol.create_message("Test message %d" % i) for i in range(10000)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 10000)
+
+ def test_produce_many_gzip(self):
+ message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
+ message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
+
+ produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 200)
+
+ def test_produce_many_snappy(self):
+ message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
+ message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
+
+ produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 200)
+
+ def test_produce_mixed(self):
+ message1 = KafkaProtocol.create_message("Just a plain message")
+ message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)])
+ message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)])
+
+ produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 201)
+
+
+ def test_produce_100k_gzipped(self):
+ produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
+ KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)])
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 100000)
+
+ #####################
+ # Consume Tests #
+ #####################
+
+ def test_consume_none(self):
+ fetch = FetchRequest("test_consume_none", 0, 0, 1024)
+
+ fetch_resp = self.client.send_fetch_request([fetch]).next()
+ self.assertEquals(fetch_resp.error, 0)
+ self.assertEquals(fetch_resp.topic, "test_consume_none")
+ self.assertEquals(fetch_resp.partition, 0)
+
+ messages = list(fetch_resp.messages)
+ self.assertEquals(len(messages), 0)
def test_produce_consume(self):
- self._test_produce_consume("test-produce-consume", KafkaClient.create_message)
-
- def test_produce_consume_snappy(self):
- self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message)
-
- def test_produce_consume_gzip(self):
- self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message)
-
- def test_check_offset(self):
- # Produce/consume a message, check that the next offset looks correct
- message1 = KafkaClient.create_message("testing 1")
- req = ProduceRequest("test-check-offset", 0, [message1])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
- req = FetchRequest("test-check-offset", 0, 0, 1024)
- (messages, nextReq) = self.kafka.get_message_set(req)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], message1)
- self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)))
-
- # Produce another message, consume with the last offset
- message2 = KafkaClient.create_message("test 2")
- req = ProduceRequest("test-check-offset", 0, [message2])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
-
- # Verify
- (messages, nextReq) = self.kafka.get_message_set(nextReq)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], message2)
- self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
-
- def test_iterator(self):
- # Produce 100 messages
- messages = []
- for i in range(100):
- messages.append(KafkaClient.create_message("testing %d" % i))
- req = ProduceRequest("test-iterator", 0, messages)
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'"))
-
- # Initialize an iterator of fetch size 64 bytes - big enough for one message
- # but not enough for all 100 messages
- cnt = 0
- for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)):
- self.assertEquals(messages[i], msg)
- cnt += 1
- self.assertEquals(cnt, 100)
-
- # Same thing, but don't auto paginate
- cnt = 0
- for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)):
- self.assertEquals(messages[i], msg)
- cnt += 1
- self.assertTrue(cnt < 100)
-
- def test_offset_request(self):
- # Produce a message to create the topic/partition
- message1 = KafkaClient.create_message("testing 1")
- req = ProduceRequest("test-offset-request", 0, [message1])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'"))
-
- t1 = int(time.time()*1000) # now
- t2 = t1 + 60000 # one minute from now
- req = OffsetRequest("test-offset-request", 0, t1, 1024)
- self.kafka.get_offsets(req)
-
- req = OffsetRequest("test-offset-request", 0, t2, 1024)
- self.kafka.get_offsets(req)
-
- def test_10k_messages(self):
- msg_tmpl = "this is a test message with a few bytes in it. this is message number %d"
- # TODO 10k actually fails, why?
- msg = KafkaClient.create_gzip_message(*[msg_tmpl % i for i in range(1000)])
- req = ProduceRequest("test-10k", 0, [msg])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-10k'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-10k-0'"))
- #self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1"))
- #self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'"))
-
- def test_queue(self):
- # Send 1000 messages
- q = KafkaQueue(self.kafka, "test-queue", [0,1])
- t1 = time.time()
- for i in range(1000):
- q.put("test %d" % i)
- t2 = time.time()
+ produce = ProduceRequest("test_produce_consume", 0, messages=[
+ KafkaProtocol.create_message("Just a test message"),
+ KafkaProtocol.create_message("Message with a key", "foo"),
+ ])
- # Wait for the producer to fully flush
- time.sleep(2)
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
- # Copy all the messages into a list
- t1 = time.time()
- consumed = []
- for i in range(1000):
- consumed.append(q.get())
- t2 = time.time()
+ fetch = FetchRequest("test_produce_consume", 0, 0, 1024)
- # Verify everything is there
- for i in range(1000):
- self.assertTrue("test %d" % i in consumed)
+ fetch_resp = self.client.send_fetch_request([fetch]).next()
+ self.assertEquals(fetch_resp.error, 0)
+
+ messages = list(fetch_resp.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].offset, 0)
+ self.assertEquals(messages[0].message.value, "Just a test message")
+ self.assertEquals(messages[0].message.key, None)
+ self.assertEquals(messages[1].offset, 1)
+ self.assertEquals(messages[1].message.value, "Message with a key")
+ self.assertEquals(messages[1].message.key, "foo")
+
+ def test_produce_consume_many(self):
+ produce = ProduceRequest("test_produce_consume_many", 0, messages=[
+ KafkaProtocol.create_message("Test message %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # 1024 is not enough for 100 messages...
+ fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024)
+
+ (fetch_resp1,) = self.client.send_fetch_request([fetch1])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 100)
+ messages = list(fetch_resp1.messages)
+ self.assertTrue(len(messages) < 100)
+
+ # 10240 should be enough
+ fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240)
+ (fetch_resp2,) = self.client.send_fetch_request([fetch2])
+
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 100)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 100)
+ for i, message in enumerate(messages):
+ self.assertEquals(message.offset, i)
+ self.assertEquals(message.message.value, "Test message %d" % i)
+ self.assertEquals(message.message.key, None)
+
+ def test_produce_consume_two_partitions(self):
+ produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[
+ KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10)
+ ])
+ produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[
+ KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10)
+ ])
+
+ for resp in self.client.send_produce_request([produce1, produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+ return
+
+ fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024)
+ fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2])
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 10)
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 10)
+ for i, message in enumerate(messages):
+ self.assertEquals(message.offset, i)
+ self.assertEquals(message.message.value, "Partition 0 %d" % i)
+ self.assertEquals(message.message.key, None)
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 10)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 10)
+ for i, message in enumerate(messages):
+ self.assertEquals(message.offset, i)
+ self.assertEquals(message.message.value, "Partition 1 %d" % i)
+ self.assertEquals(message.message.key, None)
+
+ ####################
+ # Offset Tests #
+ ####################
+
+ def test_commit_fetch_offsets(self):
+ req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
+ (resp,) = self.client.send_offset_commit_request("group", [req])
+ self.assertEquals(resp.error, 0)
+
+ req = OffsetFetchRequest("test_commit_fetch_offsets", 0)
+ (resp,) = self.client.send_offset_fetch_request("group", [req])
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 42)
+ self.assertEquals(resp.metadata, "metadata")
+
+
+
- # Shutdown the queue
- q.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)