diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-22 23:14:23 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-22 23:15:34 -0700 |
commit | b6262e4c0bc8779b331987e05d133f2a046f70b2 (patch) | |
tree | c6ce7dafa5606ca1a1e941027499d6accedd535c | |
parent | d35b8fd81fac594835e8ef2861a32b7dc08716ab (diff) | |
download | kafka-python-b6262e4c0bc8779b331987e05d133f2a046f70b2.tar.gz |
Update fixtures to eliminate extraneous logging on non-errors, split out mostly unrelated service.py, fix test in client_integration to use get_open_port, fix unintended import cascade in test_producer_integration
-rw-r--r-- | test/fixtures.py | 300 | ||||
-rw-r--r-- | test/service.py | 129 | ||||
-rw-r--r-- | test/test_client_integration.py | 5 | ||||
-rw-r--r-- | test/test_producer_integration.py | 1 | ||||
-rw-r--r-- | test/testutil.py | 26 |
5 files changed, 240 insertions, 221 deletions
diff --git a/test/fixtures.py b/test/fixtures.py index 9e283d3..bb6bc87 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,204 +1,69 @@ +import logging import glob import os -import re -import select import shutil -import socket import subprocess -import sys import tempfile -import threading -import time import uuid from urlparse import urlparse - - -PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) -KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src") -IVY_ROOT = os.path.expanduser("~/.ivy2/cache") -SCALA_VERSION = '2.8.0' - -if "PROJECT_ROOT" in os.environ: - PROJECT_ROOT = os.environ["PROJECT_ROOT"] -if "KAFKA_ROOT" in os.environ: - KAFKA_ROOT = os.environ["KAFKA_ROOT"] -if "IVY_ROOT" in os.environ: - IVY_ROOT = os.environ["IVY_ROOT"] -if "SCALA_VERSION" in os.environ: - SCALA_VERSION = os.environ["SCALA_VERSION"] - - -def test_resource(file): - return os.path.join(PROJECT_ROOT, "test", "resources", file) - - -def test_classpath(): - # ./kafka-src/bin/kafka-run-class.sh is the authority. - jars = ["."] - # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency" - jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION)) - - jars = filter(os.path.exists, map(os.path.abspath, jars)) - return ":".join(jars) - - -def kafka_run_class_args(*args): - # ./kafka-src/bin/kafka-run-class.sh is the authority. - result = ["java", "-Xmx512M", "-server"] - result.append("-Dlog4j.configuration=file:%s" % test_resource("log4j.properties")) - result.append("-Dcom.sun.management.jmxremote") - result.append("-Dcom.sun.management.jmxremote.authenticate=false") - result.append("-Dcom.sun.management.jmxremote.ssl=false") - result.append("-cp") - result.append(test_classpath()) - result.extend(args) - return result - - -def get_open_port(): - sock = socket.socket() - sock.bind(("", 0)) - port = sock.getsockname()[1] - sock.close() - return port - - -def render_template(source_file, target_file, binding): - with open(source_file, "r") as handle: - template = handle.read() - with open(target_file, "w") as handle: - handle.write(template.format(**binding)) - - -class ExternalService(object): - def __init__(self, host, port): - print("Using already running service at %s:%d" % (host, port)) - self.host = host - self.port = port - - def open(self): - pass - - def close(self): - pass - - -class SpawnedService(threading.Thread): - def __init__(self, args=[]): - threading.Thread.__init__(self) - - self.args = args - self.captured_stdout = "" - self.captured_stderr = "" - self.stdout_file = None - self.stderr_file = None - self.capture_stdout = True - self.capture_stderr = True - self.show_stdout = True - self.show_stderr = True - - self.should_die = threading.Event() - - def configure_stdout(self, file=None, capture=True, show=False): - self.stdout_file = file - self.capture_stdout = capture - self.show_stdout = show - - def configure_stderr(self, file=None, capture=False, show=True): - self.stderr_file = file - self.capture_stderr = capture - self.show_stderr = show - - def run(self): - stdout_handle = None - stderr_handle = None - try: - if self.stdout_file: - stdout_handle = open(self.stdout_file, "w") - if self.stderr_file: - stderr_handle = open(self.stderr_file, "w") - self.run_with_handles(stdout_handle, stderr_handle) - finally: - if stdout_handle: - stdout_handle.close() - if stderr_handle: - stderr_handle.close() - - def run_with_handles(self, stdout_handle, stderr_handle): - child = subprocess.Popen( - self.args, - bufsize=1, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - alive = True - - while True: - (rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1) - - if child.stdout in rds: - line = child.stdout.readline() - if stdout_handle: - stdout_handle.write(line) - stdout_handle.flush() - if self.capture_stdout: - self.captured_stdout += line - if self.show_stdout: - sys.stdout.write(line) - sys.stdout.flush() - - if child.stderr in rds: - line = child.stderr.readline() - if stderr_handle: - stderr_handle.write(line) - stderr_handle.flush() - if self.capture_stderr: - self.captured_stderr += line - if self.show_stderr: - sys.stderr.write(line) - sys.stderr.flush() - - if self.should_die.is_set(): - child.terminate() - alive = False - - if child.poll() is not None: - if not alive: - break - else: - raise RuntimeError("Subprocess has died. Aborting.") - - def wait_for(self, pattern, timeout=10): - t1 = time.time() - while True: - t2 = time.time() - if t2 - t1 >= timeout: - raise RuntimeError("Waiting for %r timed out" % pattern) - if re.search(pattern, self.captured_stdout) is not None: - return - if re.search(pattern, self.captured_stderr) is not None: - return - time.sleep(0.1) - - def start(self): - threading.Thread.start(self) - - def stop(self): - self.should_die.set() - self.join() - - -class ZookeeperFixture(object): - @staticmethod - def instance(): +from .service import ExternalService, SpawnedService +from .testutil import get_open_port + +class Fixture(object): + project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + scala_version = os.environ.get("SCALA_VERSION", '2.8.0') + kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, "kafka-src")) + ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache")) + + @classmethod + def test_resource(cls, filename): + return os.path.join(cls.project_root, "test", "resources", filename) + + @classmethod + def test_classpath(cls): + # ./kafka-src/bin/kafka-run-class.sh is the authority. + jars = ["."] + + # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency" + jars.extend(glob.glob(cls.kafka_root + "/core/target/scala-%s/*.jar" % cls.scala_version)) + + jars = filter(os.path.exists, map(os.path.abspath, jars)) + return ":".join(jars) + + @classmethod + def kafka_run_class_args(cls, *args): + # ./kafka-src/bin/kafka-run-class.sh is the authority. + result = ["java", "-Xmx512M", "-server"] + result.append("-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties")) + result.append("-Dcom.sun.management.jmxremote") + result.append("-Dcom.sun.management.jmxremote.authenticate=false") + result.append("-Dcom.sun.management.jmxremote.ssl=false") + result.append("-cp") + result.append(cls.test_classpath()) + result.extend(args) + return result + + @classmethod + def render_template(cls, source_file, target_file, binding): + with open(source_file, "r") as handle: + template = handle.read() + with open(target_file, "w") as handle: + handle.write(template.format(**binding)) + + +class ZookeeperFixture(Fixture): + @classmethod + def instance(cls): if "ZOOKEEPER_URI" in os.environ: parse = urlparse(os.environ["ZOOKEEPER_URI"]) (host, port) = (parse.hostname, parse.port) fixture = ExternalService(host, port) else: (host, port) = ("127.0.0.1", get_open_port()) - fixture = ZookeeperFixture(host, port) - fixture.open() + fixture = cls(host, port) + + fixture.open() return fixture def __init__(self, host, port): @@ -209,22 +74,22 @@ class ZookeeperFixture(object): self.child = None def out(self, message): - print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message)) + logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message) def open(self): self.tmp_dir = tempfile.mkdtemp() self.out("Running local instance...") - print(" host = %s" % self.host) - print(" port = %s" % self.port) - print(" tmp_dir = %s" % self.tmp_dir) + logging.info(" host = %s", self.host) + logging.info(" port = %s", self.port) + logging.info(" tmp_dir = %s", self.tmp_dir) # Generate configs - template = test_resource("zookeeper.properties") + template = self.test_resource("zookeeper.properties") properties = os.path.join(self.tmp_dir, "zookeeper.properties") - render_template(template, properties, vars(self)) + self.render_template(template, properties, vars(self)) # Configure Zookeeper child process - self.child = SpawnedService(kafka_run_class_args( + self.child = SpawnedService(self.kafka_run_class_args( "org.apache.zookeeper.server.quorum.QuorumPeerMain", properties )) @@ -245,9 +110,9 @@ class ZookeeperFixture(object): shutil.rmtree(self.tmp_dir) -class KafkaFixture(object): - @staticmethod - def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2): +class KafkaFixture(Fixture): + @classmethod + def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -278,7 +143,7 @@ class KafkaFixture(object): self.running = False def out(self, message): - print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message)) + logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message) def open(self): if self.running: @@ -287,27 +152,27 @@ class KafkaFixture(object): self.tmp_dir = tempfile.mkdtemp() self.out("Running local instance...") - print(" host = %s" % self.host) - print(" port = %s" % self.port) - print(" broker_id = %s" % self.broker_id) - print(" zk_host = %s" % self.zk_host) - print(" zk_port = %s" % self.zk_port) - print(" zk_chroot = %s" % self.zk_chroot) - print(" replicas = %s" % self.replicas) - print(" partitions = %s" % self.partitions) - print(" tmp_dir = %s" % self.tmp_dir) + logging.info(" host = %s", self.host) + logging.info(" port = %s", self.port) + logging.info(" broker_id = %s", self.broker_id) + logging.info(" zk_host = %s", self.zk_host) + logging.info(" zk_port = %s", self.zk_port) + logging.info(" zk_chroot = %s", self.zk_chroot) + logging.info(" replicas = %s", self.replicas) + logging.info(" partitions = %s", self.partitions) + logging.info(" tmp_dir = %s", self.tmp_dir) # Create directories os.mkdir(os.path.join(self.tmp_dir, "logs")) os.mkdir(os.path.join(self.tmp_dir, "data")) # Generate configs - template = test_resource("kafka.properties") + template = self.test_resource("kafka.properties") properties = os.path.join(self.tmp_dir, "kafka.properties") - render_template(template, properties, vars(self)) + self.render_template(template, properties, vars(self)) # Configure Kafka child process - self.child = SpawnedService(kafka_run_class_args( + self.child = SpawnedService(self.kafka_run_class_args( "kafka.Kafka", properties )) self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt")) @@ -315,13 +180,18 @@ class KafkaFixture(object): # Party! self.out("Creating Zookeeper chroot node...") - proc = subprocess.Popen(kafka_run_class_args( - "org.apache.zookeeper.ZooKeeperMain", - "-server", "%s:%d" % (self.zk_host, self.zk_port), - "create", "/%s" % self.zk_chroot, "kafka-python" - )) + proc = subprocess.Popen(self.kafka_run_class_args( + "org.apache.zookeeper.ZooKeeperMain", + "-server", "%s:%d" % (self.zk_host, self.zk_port), + "create", "/%s" % self.zk_chroot, "kafka-python" + ), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if proc.wait() != 0: self.out("Failed to create Zookeeper chroot node") + self.out(proc.stdout) + self.out(proc.stderr) raise RuntimeError("Failed to create Zookeeper chroot node") self.out("Done!") diff --git a/test/service.py b/test/service.py new file mode 100644 index 0000000..5e6ce61 --- /dev/null +++ b/test/service.py @@ -0,0 +1,129 @@ +import re +import select +import subprocess +import sys +import threading +import time + +__all__ = [ + 'ExternalService', + 'SpawnedService', + +] + +class ExternalService(object): + def __init__(self, host, port): + print("Using already running service at %s:%d" % (host, port)) + self.host = host + self.port = port + + def open(self): + pass + + def close(self): + pass + + +class SpawnedService(threading.Thread): + def __init__(self, args=[]): + threading.Thread.__init__(self) + + self.args = args + self.captured_stdout = "" + self.captured_stderr = "" + self.stdout_file = None + self.stderr_file = None + self.capture_stdout = True + self.capture_stderr = True + self.show_stdout = True + self.show_stderr = True + + self.should_die = threading.Event() + + def configure_stdout(self, file=None, capture=True, show=False): + self.stdout_file = file + self.capture_stdout = capture + self.show_stdout = show + + def configure_stderr(self, file=None, capture=False, show=True): + self.stderr_file = file + self.capture_stderr = capture + self.show_stderr = show + + def run(self): + stdout_handle = None + stderr_handle = None + try: + if self.stdout_file: + stdout_handle = open(self.stdout_file, "w") + if self.stderr_file: + stderr_handle = open(self.stderr_file, "w") + self.run_with_handles(stdout_handle, stderr_handle) + finally: + if stdout_handle: + stdout_handle.close() + if stderr_handle: + stderr_handle.close() + + def run_with_handles(self, stdout_handle, stderr_handle): + child = subprocess.Popen( + self.args, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + alive = True + + while True: + (rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1) + + if child.stdout in rds: + line = child.stdout.readline() + if stdout_handle: + stdout_handle.write(line) + stdout_handle.flush() + if self.capture_stdout: + self.captured_stdout += line + if self.show_stdout: + sys.stdout.write(line) + sys.stdout.flush() + + if child.stderr in rds: + line = child.stderr.readline() + if stderr_handle: + stderr_handle.write(line) + stderr_handle.flush() + if self.capture_stderr: + self.captured_stderr += line + if self.show_stderr: + sys.stderr.write(line) + sys.stderr.flush() + + if self.should_die.is_set(): + child.terminate() + alive = False + + if child.poll() is not None: + if not alive: + break + else: + raise RuntimeError("Subprocess has died. Aborting.") + + def wait_for(self, pattern, timeout=10): + t1 = time.time() + while True: + t2 = time.time() + if t2 - t1 >= timeout: + raise RuntimeError("Waiting for %r timed out" % pattern) + if re.search(pattern, self.captured_stdout) is not None: + return + if re.search(pattern, self.captured_stderr) is not None: + return + time.sleep(0.1) + + def start(self): + threading.Thread.start(self) + + def stop(self): + self.should_die.set() + self.join() + diff --git a/test/test_client_integration.py b/test/test_client_integration.py index e566cce..29a0cd0 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -22,11 +22,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): def test_timeout(self): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_socket.bind(('localhost', 14567)) + server_port = get_open_port() + server_socket.bind(('localhost', server_port)) with Timer() as t: with self.assertRaises((socket.timeout, socket.error)): - conn = kafka.conn.KafkaConnection("localhost", 14567, 1.0) + conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0) self.assertGreaterEqual(t.interval, 1.0) def test_consume_none(self): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index e148ad8..eb07d0a 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -1,3 +1,4 @@ +import uuid import time import unittest diff --git a/test/testutil.py b/test/testutil.py index 2cf62eb..ccb3955 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,13 +1,24 @@ -import uuid -import time -import unittest +import logging import os import random +import socket import string -import logging +import time +import unittest +import uuid + from kafka.common import OffsetRequest from kafka import KafkaClient +__all__ = [ + 'random_string', + 'skip_integration', + 'ensure_topic_creation', + 'get_open_port', + 'KafkaIntegrationTestCase', + 'Timer', +] + def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) return s @@ -25,6 +36,13 @@ def ensure_topic_creation(client, topic_name, timeout = 30): client.load_metadata_for_topics(topic_name) time.sleep(1) +def get_open_port(): + sock = socket.socket() + sock.bind(("", 0)) + port = sock.getsockname()[1] + sock.close() + return port + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None |