summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-01 15:12:21 -0500
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commiteac51e9c68c50f15962b6c785ede92cb3d512a17 (patch)
treeec814fa65e96a8a4db1d2e6aceb319ebb23a781f /test/integration.py
parent2a3d231aa61642c57537bc2128dd4f2bd30f35dd (diff)
downloadkafka-python-eac51e9c68c50f15962b6c785ede92cb3d512a17.tar.gz
Integration tests passing
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py23
1 files changed, 15 insertions, 8 deletions
diff --git a/test/integration.py b/test/integration.py
index 598b17a..9fa8538 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -10,8 +10,9 @@ import tempfile
from threading import Thread, Event
import time
import unittest
+from urlparse import urlparse
-from kafka.client08 import *
+from kafka.client import *
def get_open_port():
sock = socket.socket()
@@ -49,16 +50,26 @@ class KafkaFixture(Thread):
stdout = open(os.path.join(logDir, 'stdout'), 'w')
# Create the config file
+ zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_")
logConfig = "test/resources/log4j.properties"
configFile = os.path.join(self.tmpDir, 'server.properties')
f = open('test/resources/server.properties', 'r')
props = f.read()
f = open(configFile, 'w')
- f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2})
+ f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot})
f.close()
+ cp = build_kafka_classpath()
+
+ # Create the Zookeeper chroot
+ args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot))
+ proc = subprocess.Popen(args)
+ ret = proc.wait()
+ assert ret == 0
+
+
# Start Kafka
- args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile))
+ args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, cp, configFile))
proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
killed = False
@@ -334,11 +345,7 @@ class TestKafkaClient(unittest.TestCase):
(resp,) = self.client.send_offset_fetch_request("group", [req])
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 42)
- self.assertEquals(resp.metadata, "metadata")
-
-
-
-
+ self.assertEquals(resp.metadata, "") # Metadata isn't stored for now
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)