summaryrefslogtreecommitdiff
path: root/test/fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/fixtures.py')
-rw-r--r--test/fixtures.py43
1 files changed, 30 insertions, 13 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index c771a58..9e283d3 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -208,9 +208,12 @@ class ZookeeperFixture(object):
self.tmp_dir = None
self.child = None
+ def out(self, message):
+ print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message))
+
def open(self):
self.tmp_dir = tempfile.mkdtemp()
- print("*** Running local Zookeeper instance...")
+ self.out("Running local instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" tmp_dir = %s" % self.tmp_dir)
@@ -229,16 +232,16 @@ class ZookeeperFixture(object):
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
- print("*** Starting Zookeeper...")
+ self.out("Starting...")
self.child.start()
self.child.wait_for(r"Snapshotting")
- print("*** Done!")
+ self.out("Done!")
def close(self):
- print("*** Stopping Zookeeper...")
+ self.out("Stopping...")
self.child.stop()
self.child = None
- print("*** Done!")
+ self.out("Done!")
shutil.rmtree(self.tmp_dir)
@@ -272,10 +275,18 @@ class KafkaFixture(object):
self.tmp_dir = None
self.child = None
+ self.running = False
+
+ def out(self, message):
+ print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message))
def open(self):
+ if self.running:
+ self.out("Instance already running")
+ return
+
self.tmp_dir = tempfile.mkdtemp()
- print("*** Running local Kafka instance")
+ self.out("Running local instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" broker_id = %s" % self.broker_id)
@@ -303,25 +314,31 @@ class KafkaFixture(object):
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
- print("*** Creating Zookeeper chroot node...")
+ self.out("Creating Zookeeper chroot node...")
proc = subprocess.Popen(kafka_run_class_args(
"org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create", "/%s" % self.zk_chroot, "kafka-python"
))
if proc.wait() != 0:
- print("*** Failed to create Zookeeper chroot node")
+ self.out("Failed to create Zookeeper chroot node")
raise RuntimeError("Failed to create Zookeeper chroot node")
- print("*** Done!")
+ self.out("Done!")
- print("*** Starting Kafka...")
+ self.out("Starting...")
self.child.start()
self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
- print("*** Done!")
+ self.out("Done!")
+ self.running = True
def close(self):
- print("*** Stopping Kafka...")
+ if not self.running:
+ self.out("Instance already stopped")
+ return
+
+ self.out("Stopping...")
self.child.stop()
self.child = None
- print("*** Done!")
+ self.out("Done!")
shutil.rmtree(self.tmp_dir)
+ self.running = False