diff options
author | David Arthur <mumrah@gmail.com> | 2012-09-30 10:44:39 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-09-30 10:45:58 -0400 |
commit | 036af5b997749320a1dd6da5c48c5120e6691365 (patch) | |
tree | 2fdc79166e5d0a73cb25cebe1fe975feffaba1f0 | |
parent | 02e59b49dc32986c3fe20504f93033141d88f3b5 (diff) | |
download | kafka-python-036af5b997749320a1dd6da5c48c5120e6691365.tar.gz |
Integration test improvements
Call java directly instead of using start script. Fix synchronization
problem with producing a message and it getting flushed.
-rw-r--r-- | test/integration.py | 64 | ||||
-rw-r--r-- | test/resources/log4j.properties | 2 | ||||
-rw-r--r-- | test/resources/server.properties | 8 |
3 files changed, 53 insertions, 21 deletions
diff --git a/test/integration.py b/test/integration.py index 2779898..fccf28f 100644 --- a/test/integration.py +++ b/test/integration.py @@ -1,3 +1,4 @@ +import glob import os import select import shlex @@ -19,6 +20,16 @@ def get_open_port(): sock.close() return port +def build_kafka_classpath(): + baseDir = "./kafka-src" + jars = [] + jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar")) + jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar")) + jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar")) + jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar")) + jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar")) + return ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + class KafkaFixture(Thread): def __init__(self, port): Thread.__init__(self) @@ -26,6 +37,7 @@ class KafkaFixture(Thread): self.capture = "" self.shouldDie = Event() self.tmpDir = tempfile.mkdtemp() + print("tmp dir: %s" % self.tmpDir) def run(self): # Create the log directory @@ -33,6 +45,7 @@ class KafkaFixture(Thread): os.mkdir(logDir) # Create the config file + logConfig = "test/resources/log4j.properties" configFile = os.path.join(self.tmpDir, 'server.properties') f = open('test/resources/server.properties', 'r') props = f.read() @@ -41,7 +54,7 @@ class KafkaFixture(Thread): f.close() # Start Kafka - args = shlex.split("./kafka-src/bin/kafka-server-start.sh %s" % configFile) + args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile)) proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) killed = False @@ -57,7 +70,7 @@ class KafkaFixture(Thread): killed = True if proc.poll() is not None: - shutil.rmtree(self.tmpDir) + #shutil.rmtree(self.tmpDir) if killed: break else: @@ -71,45 +84,55 @@ class KafkaFixture(Thread): return False if target in self.capture: return True - time.sleep(1) + time.sleep(0.100) class IntegrationTest(unittest.TestCase): - def setUp(self): + @classmethod + def setUpClass(cls): port = get_open_port() - self.server = KafkaFixture(port) - self.server.start() - self.server.wait_for("Kafka server started") - self.kafka = KafkaClient("localhost", port) + cls.server = KafkaFixture(port) + cls.server.start() + cls.server.wait_for("Kafka server started") + cls.kafka = KafkaClient("localhost", port) + + @classmethod + def tearDownClass(cls): + cls.kafka.close() + cls.server.shouldDie.set() def test_produce(self): + # Produce a message, check that the log got created req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0")) + # Same thing, different partition req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")]) self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) def test_produce_consume(self): + # Send two messages and consume them message1 = KafkaClient.create_message("testing 1") message2 = KafkaClient.create_message("testing 2") req = ProduceRequest("test-produce-consume", 0, [message1, message2]) self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0")) - time.sleep(1) + self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'")) req = FetchRequest("test-produce-consume", 0, 0, 1024) (messages, req) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 2) self.assertEquals(messages[0], message1) self.assertEquals(messages[1], message2) + # Do the same, but for a different partition message3 = KafkaClient.create_message("testing 3") message4 = KafkaClient.create_message("testing 4") req = ProduceRequest("test-produce-consume", 1, [message3, message4]) self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1")) - time.sleep(1) + self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'")) req = FetchRequest("test-produce-consume", 1, 0, 1024) (messages, req) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 2) @@ -117,20 +140,29 @@ class IntegrationTest(unittest.TestCase): self.assertEquals(messages[1], message4) def test_check_offset(self): + # Produce/consume a message, check that the next offset looks correct message1 = KafkaClient.create_message("testing 1") req = ProduceRequest("test-check-offset", 0, [message1]) self.kafka.send_message_set(req) self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0")) - time.sleep(1) + self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) req = FetchRequest("test-check-offset", 0, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) + (messages, nextReq) = self.kafka.get_message_set(req) self.assertEquals(len(messages), 1) self.assertEquals(messages[0], message1) - assertEquals(req.offset, len(KafkaClient.encode_message(message1))) + self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1))) - def tearDown(self): - self.kafka.close() - self.server.shouldDie.set() + # Produce another message, consume with the last offset + message2 = KafkaClient.create_message("test 2") + req = ProduceRequest("test-check-offset", 0, [message2]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) + + # Verify + (messages, nextReq) = self.kafka.get_message_set(nextReq) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0], message2) + self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2))) if __name__ == "__main__": diff --git a/test/resources/log4j.properties b/test/resources/log4j.properties index 47a817a..c4ecd2c 100644 --- a/test/resources/log4j.properties +++ b/test/resources/log4j.properties @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=TRACE, stdout +log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/test/resources/server.properties b/test/resources/server.properties index cd2ad9a..2eefe3b 100644 --- a/test/resources/server.properties +++ b/test/resources/server.properties @@ -32,7 +32,7 @@ port=%(kafka.port)d # The number of processor threads the socket server uses for receiving and answering requests. # Defaults to the number of cores on the machine -num.threads=8 +num.threads=2 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer=1048576 @@ -68,16 +68,16 @@ num.partitions=%(kafka.partitions)d # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval=1 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 +log.default.flush.interval.ms=10000 # Per-topic overrides for log.default.flush.interval.ms #topic.flush.intervals.ms=topic1:1000, topic2:3000 # The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=1000 +log.default.flush.scheduler.interval.ms=10000 ############################# Log Retention Policy ############################# |