summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-09-30 10:44:39 -0400
committerDavid Arthur <mumrah@gmail.com>2012-09-30 10:45:58 -0400
commit036af5b997749320a1dd6da5c48c5120e6691365 (patch)
tree2fdc79166e5d0a73cb25cebe1fe975feffaba1f0
parent02e59b49dc32986c3fe20504f93033141d88f3b5 (diff)
downloadkafka-python-036af5b997749320a1dd6da5c48c5120e6691365.tar.gz
Integration test improvements
Call java directly instead of using start script. Fix synchronization problem with producing a message and it getting flushed.
-rw-r--r--test/integration.py64
-rw-r--r--test/resources/log4j.properties2
-rw-r--r--test/resources/server.properties8
3 files changed, 53 insertions, 21 deletions
diff --git a/test/integration.py b/test/integration.py
index 2779898..fccf28f 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -1,3 +1,4 @@
+import glob
import os
import select
import shlex
@@ -19,6 +20,16 @@ def get_open_port():
sock.close()
return port
+def build_kafka_classpath():
+ baseDir = "./kafka-src"
+ jars = []
+ jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar"))
+ return ":".join(["."] + [os.path.abspath(jar) for jar in jars])
+
class KafkaFixture(Thread):
def __init__(self, port):
Thread.__init__(self)
@@ -26,6 +37,7 @@ class KafkaFixture(Thread):
self.capture = ""
self.shouldDie = Event()
self.tmpDir = tempfile.mkdtemp()
+ print("tmp dir: %s" % self.tmpDir)
def run(self):
# Create the log directory
@@ -33,6 +45,7 @@ class KafkaFixture(Thread):
os.mkdir(logDir)
# Create the config file
+ logConfig = "test/resources/log4j.properties"
configFile = os.path.join(self.tmpDir, 'server.properties')
f = open('test/resources/server.properties', 'r')
props = f.read()
@@ -41,7 +54,7 @@ class KafkaFixture(Thread):
f.close()
# Start Kafka
- args = shlex.split("./kafka-src/bin/kafka-server-start.sh %s" % configFile)
+ args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile))
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
killed = False
@@ -57,7 +70,7 @@ class KafkaFixture(Thread):
killed = True
if proc.poll() is not None:
- shutil.rmtree(self.tmpDir)
+ #shutil.rmtree(self.tmpDir)
if killed:
break
else:
@@ -71,45 +84,55 @@ class KafkaFixture(Thread):
return False
if target in self.capture:
return True
- time.sleep(1)
+ time.sleep(0.100)
class IntegrationTest(unittest.TestCase):
- def setUp(self):
+ @classmethod
+ def setUpClass(cls):
port = get_open_port()
- self.server = KafkaFixture(port)
- self.server.start()
- self.server.wait_for("Kafka server started")
- self.kafka = KafkaClient("localhost", port)
+ cls.server = KafkaFixture(port)
+ cls.server.start()
+ cls.server.wait_for("Kafka server started")
+ cls.kafka = KafkaClient("localhost", port)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.kafka.close()
+ cls.server.shouldDie.set()
def test_produce(self):
+ # Produce a message, check that the log got created
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
+ # Same thing, different partition
req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
def test_produce_consume(self):
+ # Send two messages and consume them
message1 = KafkaClient.create_message("testing 1")
message2 = KafkaClient.create_message("testing 2")
req = ProduceRequest("test-produce-consume", 0, [message1, message2])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
- time.sleep(1)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'"))
req = FetchRequest("test-produce-consume", 0, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
self.assertEquals(messages[0], message1)
self.assertEquals(messages[1], message2)
+ # Do the same, but for a different partition
message3 = KafkaClient.create_message("testing 3")
message4 = KafkaClient.create_message("testing 4")
req = ProduceRequest("test-produce-consume", 1, [message3, message4])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
- time.sleep(1)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'"))
req = FetchRequest("test-produce-consume", 1, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
@@ -117,20 +140,29 @@ class IntegrationTest(unittest.TestCase):
self.assertEquals(messages[1], message4)
def test_check_offset(self):
+ # Produce/consume a message, check that the next offset looks correct
message1 = KafkaClient.create_message("testing 1")
req = ProduceRequest("test-check-offset", 0, [message1])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
- time.sleep(1)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
req = FetchRequest("test-check-offset", 0, 0, 1024)
- (messages, req) = self.kafka.get_message_set(req)
+ (messages, nextReq) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0], message1)
- assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
+ self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)))
- def tearDown(self):
- self.kafka.close()
- self.server.shouldDie.set()
+ # Produce another message, consume with the last offset
+ message2 = KafkaClient.create_message("test 2")
+ req = ProduceRequest("test-check-offset", 0, [message2])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
+
+ # Verify
+ (messages, nextReq) = self.kafka.get_message_set(nextReq)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0], message2)
+ self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
if __name__ == "__main__":
diff --git a/test/resources/log4j.properties b/test/resources/log4j.properties
index 47a817a..c4ecd2c 100644
--- a/test/resources/log4j.properties
+++ b/test/resources/log4j.properties
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=TRACE, stdout
+log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
diff --git a/test/resources/server.properties b/test/resources/server.properties
index cd2ad9a..2eefe3b 100644
--- a/test/resources/server.properties
+++ b/test/resources/server.properties
@@ -32,7 +32,7 @@ port=%(kafka.port)d
# The number of processor threads the socket server uses for receiving and answering requests.
# Defaults to the number of cores on the machine
-num.threads=8
+num.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
@@ -68,16 +68,16 @@ num.partitions=%(kafka.partitions)d
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=10000
+log.flush.interval=1
# The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=1000
+log.default.flush.interval.ms=10000
# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
+log.default.flush.scheduler.interval.ms=10000
############################# Log Retention Policy #############################