summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-19 14:03:51 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit3499e2f6ead76e1c2db6ac754358bd57f9a15268 (patch)
tree8d7f827f2caabedadb68ad37cee4dcc908d9cd11 /test/integration.py
parent1b721325fe6b170cdfe001749dbd7b750fe59512 (diff)
downloadkafka-python-3499e2f6ead76e1c2db6ac754358bd57f9a15268.tar.gz
Some work on a simple consumer
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py64
1 files changed, 58 insertions, 6 deletions
diff --git a/test/integration.py b/test/integration.py
index 91917e6..0f4d9f1 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -35,8 +35,10 @@ def build_kafka_classpath():
return cp
class KafkaFixture(Thread):
- def __init__(self, host, port):
+ def __init__(self, host, port, broker_id, zk_chroot=None):
Thread.__init__(self)
+ self.broker_id = broker_id
+ self.zk_chroot = zk_chroot
self.port = port
self.capture = ""
self.shouldDie = Event()
@@ -50,19 +52,24 @@ class KafkaFixture(Thread):
stdout = open(os.path.join(logDir, 'stdout'), 'w')
# Create the config file
- zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_")
+ if self.zk_chroot is None:
+ self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_")
logConfig = "test/resources/log4j.properties"
configFile = os.path.join(self.tmpDir, 'server.properties')
f = open('test/resources/server.properties', 'r')
props = f.read()
f = open(configFile, 'w')
- f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot})
+ f.write(props % {'broker.id': self.broker_id,
+ 'kafka.port': self.port,
+ 'kafka.tmp.dir': logDir,
+ 'kafka.partitions': 2,
+ 'zk.chroot': self.zk_chroot})
f.close()
cp = build_kafka_classpath()
# Create the Zookeeper chroot
- args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot))
+ args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
proc = subprocess.Popen(args)
ret = proc.wait()
assert ret == 0
@@ -123,7 +130,7 @@ class TestKafkaClient(unittest.TestCase):
cls.client = KafkaClient(host, port)
else:
port = get_open_port()
- cls.server = KafkaFixture("localhost", port)
+ cls.server = KafkaFixture("localhost", port, 0)
cls.server.start()
cls.server.wait_for("Kafka server started")
cls.client = KafkaClient("localhost", port)
@@ -367,10 +374,55 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "two")
- # Consumer Tests
+class TestConsumer(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # Broker 0
+ port = get_open_port()
+ cls.server1 = KafkaFixture("localhost", port, 0)
+ cls.server1.start()
+ cls.server1.wait_for("Kafka server started")
+
+ # Broker 1
+ zk = cls.server1.zk_chroot
+ port = get_open_port()
+ cls.server2 = KafkaFixture("localhost", port, 1, zk)
+ cls.server2.start()
+ cls.server2.wait_for("Kafka server started")
+
+ # Client bootstraps from broker 1
+ cls.client = KafkaClient("localhost", port)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.client.close()
+ cls.server1.close()
+ cls.server2.close()
def test_consumer(self):
+ produce1 = ProduceRequest("test_consumer", 0, messages=[
+ KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100)
+ ])
+
+ produce2 = ProduceRequest("test_consumer", 1, messages=[
+ KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+
+ self.assertEquals(len(all_messages), 200)
+ self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)