diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-12 10:19:51 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-12 10:19:51 -0800 |
commit | 590d75a76a4b9d666a0340daef8ef328ca87e066 (patch) | |
tree | 492e6e27d897b29b172e67bb3c79665248a04c28 | |
parent | 11cf12ff531919e9ade2d7ebf55e9f0f6a1592da (diff) | |
download | kafka-python-590d75a76a4b9d666a0340daef8ef328ca87e066.tar.gz |
Improve Zookeeper / Kafka Fixture management
- spawn fixtures via daemon threads
- close fixtures atexit and in __del__ to avoid interpreter hangs
- raise Exception on timeouts in open()
-rw-r--r-- | test/fixtures.py | 25 | ||||
-rw-r--r-- | test/service.py | 1 |
2 files changed, 22 insertions, 4 deletions
diff --git a/test/fixtures.py b/test/fixtures.py index 3892416..3c5e694 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,3 +1,4 @@ +import atexit import logging import os import os.path @@ -130,24 +131,33 @@ class ZookeeperFixture(Fixture): timeout = 5 max_timeout = 30 backoff = 1 - while True: + end_at = time.time() + max_timeout + while time.time() < end_at: self.child = SpawnedService(args, env) self.child.start() - timeout = min(timeout, max_timeout) + timeout = min(timeout, max(end_at - time.time(), 0)) if self.child.wait_for(r"binding to port", timeout=timeout): break self.child.stop() timeout *= 2 time.sleep(backoff) + else: + raise Exception('Failed to start Zookeeper before max_timeout') self.out("Done!") + atexit.register(self.close) def close(self): + if self.child is None: + return self.out("Stopping...") self.child.stop() self.child = None self.out("Done!") shutil.rmtree(self.tmp_dir) + def __del__(self): + self.close() + class KafkaFixture(Fixture): @classmethod @@ -240,18 +250,25 @@ class KafkaFixture(Fixture): timeout = 5 max_timeout = 30 backoff = 1 - while True: + end_at = time.time() + max_timeout + while time.time() < end_at: self.child = SpawnedService(args, env) self.child.start() - timeout = min(timeout, max_timeout) + timeout = min(timeout, max(end_at - time.time(), 0)) if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=timeout): break self.child.stop() timeout *= 2 time.sleep(backoff) + else: + raise Exception('Failed to start KafkaInstance before max_timeout') self.out("Done!") self.running = True + atexit.register(self.close) + + def __del__(self): + self.close() def close(self): if not self.running: diff --git a/test/service.py b/test/service.py index ea29c33..0a9ee72 100644 --- a/test/service.py +++ b/test/service.py @@ -43,6 +43,7 @@ class SpawnedService(threading.Thread): self.should_die = threading.Event() self.child = None self.alive = False + self.daemon = True def run(self): self.run_with_handles() |