diff options
-rw-r--r-- | README.md | 3 | ||||
m--------- | kafka-src | 0 | ||||
-rw-r--r-- | test/fixtures.py | 17 | ||||
-rw-r--r-- | test/resources/kafka.properties | 8 | ||||
-rw-r--r-- | test/test_integration.py | 32 |
5 files changed, 32 insertions, 28 deletions
@@ -18,7 +18,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE` # Status I'm following the version numbers of Kafka, plus one number to indicate the -version of this project. The current version is 0.8.1-1. This version is under +version of this project. The current version is 0.8.0-1. This version is under development, APIs are subject to change. # Usage @@ -196,6 +196,7 @@ git submodule update cd kafka-src ./sbt update ./sbt package +./sbt assembly-package-dependency ``` And then run the tests. This will actually start up real local Zookeeper diff --git a/kafka-src b/kafka-src -Subproject 9ff4e8eb10e0ddd86f257e99d55971a13242660 +Subproject 7b43f0132ce93a231b21e943d665ec4227f67b6 diff --git a/test/fixtures.py b/test/fixtures.py index abaaa5c..00c1afd 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -17,6 +17,7 @@ from urlparse import urlparse PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src") IVY_ROOT = os.path.expanduser("~/.ivy2/cache") +SCALA_VERSION = '2.8.0' if "PROJECT_ROOT" in os.environ: PROJECT_ROOT = os.environ["PROJECT_ROOT"] @@ -24,6 +25,8 @@ if "KAFKA_ROOT" in os.environ: KAFKA_ROOT = os.environ["KAFKA_ROOT"] if "IVY_ROOT" in os.environ: IVY_ROOT = os.environ["IVY_ROOT"] +if "SCALA_VERSION" in os.environ: + SCALA_VERSION = os.environ["SCALA_VERSION"] def test_resource(file): @@ -33,16 +36,8 @@ def test_resource(file): def test_classpath(): # ./kafka-src/bin/kafka-run-class.sh is the authority. jars = ["."] - jars.append(IVY_ROOT + "/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar") - jars.append(IVY_ROOT + "/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar") - jars.append(IVY_ROOT + "/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar") - jars.append(IVY_ROOT + "/log4j/log4j/jars/log4j-1.2.15.jar") - jars.append(IVY_ROOT + "/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar") - jars.append(IVY_ROOT + "/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar") - jars.append(IVY_ROOT + "/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar") - jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-2.8.0/*.jar")) - jars.extend(glob.glob(KAFKA_ROOT + "/core/lib/*.jar")) - jars.extend(glob.glob(KAFKA_ROOT + "/perf/target/scala-2.8.0/kafka*.jar")) + # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency" + jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION)) jars = filter(os.path.exists, map(os.path.abspath, jars)) return ":".join(jars) @@ -314,7 +309,7 @@ class KafkaFixture(object): print("*** Starting Kafka...") self.child.start() - self.child.wait_for(r"\[Kafka Server \d+\], started") + self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id) print("*** Done!") def close(self): diff --git a/test/resources/kafka.properties b/test/resources/kafka.properties index 2c8416f..d42c097 100644 --- a/test/resources/kafka.properties +++ b/test/resources/kafka.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -47,8 +47,8 @@ log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -zk.connect={zk_host}:{zk_port}/{zk_chroot} -zk.connection.timeout.ms=1000000 +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} +zookeeper.connection.timeout.ms=1000000 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter diff --git a/test/test_integration.py b/test/test_integration.py index bf1acc8..d8ead59 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -242,6 +242,7 @@ class TestKafkaClient(unittest.TestCase): # Offset Tests # #################### + @unittest.skip('commmit offset not supported in this version') def test_commit_fetch_offsets(self): req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) @@ -401,8 +402,9 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_acks_cluster_commit(self): - producer = SimpleProducer(self.client, "test_acks_cluster_commit", - req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + producer = SimpleProducer( + self.client, "test_acks_cluster_commit", + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) resp = producer.send_messages("one") self.assertEquals(len(resp), 1) @@ -548,11 +550,11 @@ class TestKafkaClient(unittest.TestCase): class TestConsumer(unittest.TestCase): @classmethod - def setUpClass(cls): # noqa + def setUpClass(cls): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port) + cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) @classmethod def tearDownClass(cls): # noqa @@ -581,7 +583,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -604,6 +606,11 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 13) + consumer.stop() + + def test_simple_consumer_blocking(self): + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + # Blocking API start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) @@ -612,13 +619,13 @@ class TestConsumer(unittest.TestCase): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_simple_consumer", 0, messages=[ + produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) - self.assertEquals(resp.offset, 100) + self.assertEquals(resp.offset, 0) # Fetch 5 messages messages = consumer.get_messages(count=5, block=True, timeout=5) @@ -650,7 +657,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -676,7 +683,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") + consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -732,7 +739,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") + consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -749,7 +756,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 10 messages that are too large (bigger than default fetch size) - messages2=[create_message(random_string(5000)) for i in range(10)] + messages2 = [create_message(random_string(5000)) for i in range(10)] produce2 = ProduceRequest("test_large_messages", 0, messages2) for resp in self.client.send_produce_request([produce2]): @@ -757,12 +764,13 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages") + consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) return s |