diff options
author | David Arthur <mumrah@gmail.com> | 2013-03-19 14:03:51 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 3499e2f6ead76e1c2db6ac754358bd57f9a15268 (patch) | |
tree | 8d7f827f2caabedadb68ad37cee4dcc908d9cd11 /test/integration.py | |
parent | 1b721325fe6b170cdfe001749dbd7b750fe59512 (diff) | |
download | kafka-python-3499e2f6ead76e1c2db6ac754358bd57f9a15268.tar.gz |
Some work on a simple consumer
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 64 |
1 files changed, 58 insertions, 6 deletions
diff --git a/test/integration.py b/test/integration.py index 91917e6..0f4d9f1 100644 --- a/test/integration.py +++ b/test/integration.py @@ -35,8 +35,10 @@ def build_kafka_classpath(): return cp class KafkaFixture(Thread): - def __init__(self, host, port): + def __init__(self, host, port, broker_id, zk_chroot=None): Thread.__init__(self) + self.broker_id = broker_id + self.zk_chroot = zk_chroot self.port = port self.capture = "" self.shouldDie = Event() @@ -50,19 +52,24 @@ class KafkaFixture(Thread): stdout = open(os.path.join(logDir, 'stdout'), 'w') # Create the config file - zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_") + if self.zk_chroot is None: + self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_") logConfig = "test/resources/log4j.properties" configFile = os.path.join(self.tmpDir, 'server.properties') f = open('test/resources/server.properties', 'r') props = f.read() f = open(configFile, 'w') - f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot}) + f.write(props % {'broker.id': self.broker_id, + 'kafka.port': self.port, + 'kafka.tmp.dir': logDir, + 'kafka.partitions': 2, + 'zk.chroot': self.zk_chroot}) f.close() cp = build_kafka_classpath() # Create the Zookeeper chroot - args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot)) + args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot)) proc = subprocess.Popen(args) ret = proc.wait() assert ret == 0 @@ -123,7 +130,7 @@ class TestKafkaClient(unittest.TestCase): cls.client = KafkaClient(host, port) else: port = get_open_port() - cls.server = KafkaFixture("localhost", port) + cls.server = KafkaFixture("localhost", port, 0) cls.server.start() cls.server.wait_for("Kafka server started") cls.client = KafkaClient("localhost", port) @@ -367,10 +374,55 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "two") - # Consumer Tests +class TestConsumer(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Broker 0 + port = get_open_port() + cls.server1 = KafkaFixture("localhost", port, 0) + cls.server1.start() + cls.server1.wait_for("Kafka server started") + + # Broker 1 + zk = cls.server1.zk_chroot + port = get_open_port() + cls.server2 = KafkaFixture("localhost", port, 1, zk) + cls.server2.start() + cls.server2.wait_for("Kafka server started") + + # Client bootstraps from broker 1 + cls.client = KafkaClient("localhost", port) + + @classmethod + def tearDownClass(cls): + cls.client.close() + cls.server1.close() + cls.server2.close() def test_consumer(self): + produce1 = ProduceRequest("test_consumer", 0, messages=[ + KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100) + ]) + + produce2 = ProduceRequest("test_consumer", 1, messages=[ + KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes if __name__ == "__main__": logging.basicConfig(level=logging.INFO) |