diff options
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 407 |
1 files changed, 245 insertions, 162 deletions
diff --git a/test/integration.py b/test/integration.py index 3971d3f..598b17a 100644 --- a/test/integration.py +++ b/test/integration.py @@ -11,8 +11,7 @@ from threading import Thread, Event import time import unittest -from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest -from kafka.queue import KafkaQueue +from kafka.client08 import * def get_open_port(): sock = socket.socket() @@ -27,12 +26,15 @@ def build_kafka_classpath(): jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar")) jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar")) jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar")) - jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar")) jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar")) - return ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + jars += glob.glob(os.path.join(baseDir, "core/target/scala-2.8.0/kafka_2.8.0-*.jar")) + jars += glob.glob(os.path.join(baseDir, "/Users/mumrah/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")) + cp = ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + cp += ":" + os.path.abspath(os.path.join(baseDir, "conf/log4j.properties")) + return cp class KafkaFixture(Thread): - def __init__(self, port): + def __init__(self, host, port): Thread.__init__(self) self.port = port self.capture = "" @@ -57,7 +59,7 @@ class KafkaFixture(Thread): # Start Kafka args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile)) - proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) + proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) killed = False while True: @@ -65,6 +67,7 @@ class KafkaFixture(Thread): if proc.stdout in rlist: read = proc.stdout.readline() stdout.write(read) + stdout.flush() self.capture += read if self.shouldDie.is_set(): @@ -88,174 +91,254 @@ class KafkaFixture(Thread): return True time.sleep(0.100) + def close(self): + self.shouldDie.set() -class IntegrationTest(unittest.TestCase): +class ExternalKafkaFixture(object): + def __init__(self, host, port): + print("Using already running Kafka at %s:%d" % (host, port)) + + def close(self): + pass + + +class TestKafkaClient(unittest.TestCase): @classmethod def setUpClass(cls): - port = get_open_port() - cls.server = KafkaFixture(port) - cls.server.start() - cls.server.wait_for("Kafka server started") - cls.kafka = KafkaClient("localhost", port) + if os.environ.has_key('KAFKA_URI'): + parse = urlparse(os.environ['KAFKA_URI']) + (host, port) = (parse.hostname, parse.port) + cls.server = ExternalKafkaFixture(host, port) + cls.client = KafkaClient(host, port) + else: + port = get_open_port() + cls.server = KafkaFixture("localhost", port) + cls.server.start() + cls.server.wait_for("Kafka server started") + cls.client = KafkaClient("localhost", port) @classmethod def tearDownClass(cls): - cls.kafka.close() - cls.server.shouldDie.set() - - def test_send_simple(self): - self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3") - self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'")) - self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple")) - - def test_produce(self): - # Produce a message, check that the log got created - req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0")) - - # Same thing, different partition - req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) - - def _test_produce_consume(self, topic, create_func): - # Send two messages and consume them - message1 = create_func("testing 1") - message2 = create_func("testing 2") - req = ProduceRequest(topic, 0, [message1, message2]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic)) - self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic)) - req = FetchRequest(topic, 0, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 2) - self.assertEquals(messages[0].payload, "testing 1") - self.assertEquals(messages[1].payload, "testing 2") - - # Do the same, but for a different partition - message3 = create_func("testing 3") - message4 = create_func("testing 4") - req = ProduceRequest(topic, 1, [message3, message4]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic)) - self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic)) - req = FetchRequest(topic, 1, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 2) - self.assertEquals(messages[0].payload, "testing 3") - self.assertEquals(messages[1].payload, "testing 4") + cls.client.close() + cls.server.close() + + ##################### + # Produce Tests # + ##################### + + def test_produce_many_simple(self): + produce = ProduceRequest("test_produce_many_simple", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 200) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 300) + + def test_produce_10k_simple(self): + produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(10000) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 10000) + + def test_produce_many_gzip(self): + message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) + message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_many_snappy(self): + message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) + message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_mixed(self): + message1 = KafkaProtocol.create_message("Just a plain message") + message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)]) + message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 201) + + + def test_produce_100k_gzipped(self): + produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)]) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100000) + + ##################### + # Consume Tests # + ##################### + + def test_consume_none(self): + fetch = FetchRequest("test_consume_none", 0, 0, 1024) + + fetch_resp = self.client.send_fetch_request([fetch]).next() + self.assertEquals(fetch_resp.error, 0) + self.assertEquals(fetch_resp.topic, "test_consume_none") + self.assertEquals(fetch_resp.partition, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 0) def test_produce_consume(self): - self._test_produce_consume("test-produce-consume", KafkaClient.create_message) - - def test_produce_consume_snappy(self): - self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message) - - def test_produce_consume_gzip(self): - self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message) - - def test_check_offset(self): - # Produce/consume a message, check that the next offset looks correct - message1 = KafkaClient.create_message("testing 1") - req = ProduceRequest("test-check-offset", 0, [message1]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) - req = FetchRequest("test-check-offset", 0, 0, 1024) - (messages, nextReq) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], message1) - self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1))) - - # Produce another message, consume with the last offset - message2 = KafkaClient.create_message("test 2") - req = ProduceRequest("test-check-offset", 0, [message2]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) - - # Verify - (messages, nextReq) = self.kafka.get_message_set(nextReq) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], message2) - self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2))) - - def test_iterator(self): - # Produce 100 messages - messages = [] - for i in range(100): - messages.append(KafkaClient.create_message("testing %d" % i)) - req = ProduceRequest("test-iterator", 0, messages) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'")) - - # Initialize an iterator of fetch size 64 bytes - big enough for one message - # but not enough for all 100 messages - cnt = 0 - for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)): - self.assertEquals(messages[i], msg) - cnt += 1 - self.assertEquals(cnt, 100) - - # Same thing, but don't auto paginate - cnt = 0 - for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)): - self.assertEquals(messages[i], msg) - cnt += 1 - self.assertTrue(cnt < 100) - - def test_offset_request(self): - # Produce a message to create the topic/partition - message1 = KafkaClient.create_message("testing 1") - req = ProduceRequest("test-offset-request", 0, [message1]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'")) - - t1 = int(time.time()*1000) # now - t2 = t1 + 60000 # one minute from now - req = OffsetRequest("test-offset-request", 0, t1, 1024) - self.kafka.get_offsets(req) - - req = OffsetRequest("test-offset-request", 0, t2, 1024) - self.kafka.get_offsets(req) - - def test_10k_messages(self): - msg_tmpl = "this is a test message with a few bytes in it. this is message number %d" - # TODO 10k actually fails, why? - msg = KafkaClient.create_gzip_message(*[msg_tmpl % i for i in range(1000)]) - req = ProduceRequest("test-10k", 0, [msg]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-10k'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-10k-0'")) - #self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1")) - #self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'")) - - def test_queue(self): - # Send 1000 messages - q = KafkaQueue(self.kafka, "test-queue", [0,1]) - t1 = time.time() - for i in range(1000): - q.put("test %d" % i) - t2 = time.time() + produce = ProduceRequest("test_produce_consume", 0, messages=[ + KafkaProtocol.create_message("Just a test message"), + KafkaProtocol.create_message("Message with a key", "foo"), + ]) - # Wait for the producer to fully flush - time.sleep(2) + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) - # Copy all the messages into a list - t1 = time.time() - consumed = [] - for i in range(1000): - consumed.append(q.get()) - t2 = time.time() + fetch = FetchRequest("test_produce_consume", 0, 0, 1024) - # Verify everything is there - for i in range(1000): - self.assertTrue("test %d" % i in consumed) + fetch_resp = self.client.send_fetch_request([fetch]).next() + self.assertEquals(fetch_resp.error, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].offset, 0) + self.assertEquals(messages[0].message.value, "Just a test message") + self.assertEquals(messages[0].message.key, None) + self.assertEquals(messages[1].offset, 1) + self.assertEquals(messages[1].message.value, "Message with a key") + self.assertEquals(messages[1].message.key, "foo") + + def test_produce_consume_many(self): + produce = ProduceRequest("test_produce_consume_many", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # 1024 is not enough for 100 messages... + fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024) + + (fetch_resp1,) = self.client.send_fetch_request([fetch1]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 100) + messages = list(fetch_resp1.messages) + self.assertTrue(len(messages) < 100) + + # 10240 should be enough + fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240) + (fetch_resp2,) = self.client.send_fetch_request([fetch2]) + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 100) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 100) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Test message %d" % i) + self.assertEquals(message.message.key, None) + + def test_produce_consume_two_partitions(self): + produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ + KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10) + ]) + produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ + KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce1, produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + return + + fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) + fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 10) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 0 %d" % i) + self.assertEquals(message.message.key, None) + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 10) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 1 %d" % i) + self.assertEquals(message.message.key, None) + + #################### + # Offset Tests # + #################### + + def test_commit_fetch_offsets(self): + req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") + (resp,) = self.client.send_offset_commit_request("group", [req]) + self.assertEquals(resp.error, 0) + + req = OffsetFetchRequest("test_commit_fetch_offsets", 0) + (resp,) = self.client.send_offset_fetch_request("group", [req]) + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 42) + self.assertEquals(resp.metadata, "metadata") + + + - # Shutdown the queue - q.close() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) |