summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-22 23:14:23 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-22 23:15:34 -0700
commitb6262e4c0bc8779b331987e05d133f2a046f70b2 (patch)
treec6ce7dafa5606ca1a1e941027499d6accedd535c
parentd35b8fd81fac594835e8ef2861a32b7dc08716ab (diff)
downloadkafka-python-b6262e4c0bc8779b331987e05d133f2a046f70b2.tar.gz
Update fixtures to eliminate extraneous logging on non-errors, split out mostly unrelated service.py, fix test in client_integration to use get_open_port, fix unintended import cascade in test_producer_integration
-rw-r--r--test/fixtures.py300
-rw-r--r--test/service.py129
-rw-r--r--test/test_client_integration.py5
-rw-r--r--test/test_producer_integration.py1
-rw-r--r--test/testutil.py26
5 files changed, 240 insertions, 221 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index 9e283d3..bb6bc87 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -1,204 +1,69 @@
+import logging
import glob
import os
-import re
-import select
import shutil
-import socket
import subprocess
-import sys
import tempfile
-import threading
-import time
import uuid
from urlparse import urlparse
-
-
-PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
-KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src")
-IVY_ROOT = os.path.expanduser("~/.ivy2/cache")
-SCALA_VERSION = '2.8.0'
-
-if "PROJECT_ROOT" in os.environ:
- PROJECT_ROOT = os.environ["PROJECT_ROOT"]
-if "KAFKA_ROOT" in os.environ:
- KAFKA_ROOT = os.environ["KAFKA_ROOT"]
-if "IVY_ROOT" in os.environ:
- IVY_ROOT = os.environ["IVY_ROOT"]
-if "SCALA_VERSION" in os.environ:
- SCALA_VERSION = os.environ["SCALA_VERSION"]
-
-
-def test_resource(file):
- return os.path.join(PROJECT_ROOT, "test", "resources", file)
-
-
-def test_classpath():
- # ./kafka-src/bin/kafka-run-class.sh is the authority.
- jars = ["."]
- # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
- jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION))
-
- jars = filter(os.path.exists, map(os.path.abspath, jars))
- return ":".join(jars)
-
-
-def kafka_run_class_args(*args):
- # ./kafka-src/bin/kafka-run-class.sh is the authority.
- result = ["java", "-Xmx512M", "-server"]
- result.append("-Dlog4j.configuration=file:%s" % test_resource("log4j.properties"))
- result.append("-Dcom.sun.management.jmxremote")
- result.append("-Dcom.sun.management.jmxremote.authenticate=false")
- result.append("-Dcom.sun.management.jmxremote.ssl=false")
- result.append("-cp")
- result.append(test_classpath())
- result.extend(args)
- return result
-
-
-def get_open_port():
- sock = socket.socket()
- sock.bind(("", 0))
- port = sock.getsockname()[1]
- sock.close()
- return port
-
-
-def render_template(source_file, target_file, binding):
- with open(source_file, "r") as handle:
- template = handle.read()
- with open(target_file, "w") as handle:
- handle.write(template.format(**binding))
-
-
-class ExternalService(object):
- def __init__(self, host, port):
- print("Using already running service at %s:%d" % (host, port))
- self.host = host
- self.port = port
-
- def open(self):
- pass
-
- def close(self):
- pass
-
-
-class SpawnedService(threading.Thread):
- def __init__(self, args=[]):
- threading.Thread.__init__(self)
-
- self.args = args
- self.captured_stdout = ""
- self.captured_stderr = ""
- self.stdout_file = None
- self.stderr_file = None
- self.capture_stdout = True
- self.capture_stderr = True
- self.show_stdout = True
- self.show_stderr = True
-
- self.should_die = threading.Event()
-
- def configure_stdout(self, file=None, capture=True, show=False):
- self.stdout_file = file
- self.capture_stdout = capture
- self.show_stdout = show
-
- def configure_stderr(self, file=None, capture=False, show=True):
- self.stderr_file = file
- self.capture_stderr = capture
- self.show_stderr = show
-
- def run(self):
- stdout_handle = None
- stderr_handle = None
- try:
- if self.stdout_file:
- stdout_handle = open(self.stdout_file, "w")
- if self.stderr_file:
- stderr_handle = open(self.stderr_file, "w")
- self.run_with_handles(stdout_handle, stderr_handle)
- finally:
- if stdout_handle:
- stdout_handle.close()
- if stderr_handle:
- stderr_handle.close()
-
- def run_with_handles(self, stdout_handle, stderr_handle):
- child = subprocess.Popen(
- self.args,
- bufsize=1,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- alive = True
-
- while True:
- (rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
-
- if child.stdout in rds:
- line = child.stdout.readline()
- if stdout_handle:
- stdout_handle.write(line)
- stdout_handle.flush()
- if self.capture_stdout:
- self.captured_stdout += line
- if self.show_stdout:
- sys.stdout.write(line)
- sys.stdout.flush()
-
- if child.stderr in rds:
- line = child.stderr.readline()
- if stderr_handle:
- stderr_handle.write(line)
- stderr_handle.flush()
- if self.capture_stderr:
- self.captured_stderr += line
- if self.show_stderr:
- sys.stderr.write(line)
- sys.stderr.flush()
-
- if self.should_die.is_set():
- child.terminate()
- alive = False
-
- if child.poll() is not None:
- if not alive:
- break
- else:
- raise RuntimeError("Subprocess has died. Aborting.")
-
- def wait_for(self, pattern, timeout=10):
- t1 = time.time()
- while True:
- t2 = time.time()
- if t2 - t1 >= timeout:
- raise RuntimeError("Waiting for %r timed out" % pattern)
- if re.search(pattern, self.captured_stdout) is not None:
- return
- if re.search(pattern, self.captured_stderr) is not None:
- return
- time.sleep(0.1)
-
- def start(self):
- threading.Thread.start(self)
-
- def stop(self):
- self.should_die.set()
- self.join()
-
-
-class ZookeeperFixture(object):
- @staticmethod
- def instance():
+from .service import ExternalService, SpawnedService
+from .testutil import get_open_port
+
+class Fixture(object):
+ project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+ scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
+ kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, "kafka-src"))
+ ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
+
+ @classmethod
+ def test_resource(cls, filename):
+ return os.path.join(cls.project_root, "test", "resources", filename)
+
+ @classmethod
+ def test_classpath(cls):
+ # ./kafka-src/bin/kafka-run-class.sh is the authority.
+ jars = ["."]
+
+ # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
+ jars.extend(glob.glob(cls.kafka_root + "/core/target/scala-%s/*.jar" % cls.scala_version))
+
+ jars = filter(os.path.exists, map(os.path.abspath, jars))
+ return ":".join(jars)
+
+ @classmethod
+ def kafka_run_class_args(cls, *args):
+ # ./kafka-src/bin/kafka-run-class.sh is the authority.
+ result = ["java", "-Xmx512M", "-server"]
+ result.append("-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties"))
+ result.append("-Dcom.sun.management.jmxremote")
+ result.append("-Dcom.sun.management.jmxremote.authenticate=false")
+ result.append("-Dcom.sun.management.jmxremote.ssl=false")
+ result.append("-cp")
+ result.append(cls.test_classpath())
+ result.extend(args)
+ return result
+
+ @classmethod
+ def render_template(cls, source_file, target_file, binding):
+ with open(source_file, "r") as handle:
+ template = handle.read()
+ with open(target_file, "w") as handle:
+ handle.write(template.format(**binding))
+
+
+class ZookeeperFixture(Fixture):
+ @classmethod
+ def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
- fixture = ZookeeperFixture(host, port)
- fixture.open()
+ fixture = cls(host, port)
+
+ fixture.open()
return fixture
def __init__(self, host, port):
@@ -209,22 +74,22 @@ class ZookeeperFixture(object):
self.child = None
def out(self, message):
- print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message))
+ logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
- print(" host = %s" % self.host)
- print(" port = %s" % self.port)
- print(" tmp_dir = %s" % self.tmp_dir)
+ logging.info(" host = %s", self.host)
+ logging.info(" port = %s", self.port)
+ logging.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
- template = test_resource("zookeeper.properties")
+ template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
- render_template(template, properties, vars(self))
+ self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
- self.child = SpawnedService(kafka_run_class_args(
+ self.child = SpawnedService(self.kafka_run_class_args(
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties
))
@@ -245,9 +110,9 @@ class ZookeeperFixture(object):
shutil.rmtree(self.tmp_dir)
-class KafkaFixture(object):
- @staticmethod
- def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
+class KafkaFixture(Fixture):
+ @classmethod
+ def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -278,7 +143,7 @@ class KafkaFixture(object):
self.running = False
def out(self, message):
- print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message))
+ logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
def open(self):
if self.running:
@@ -287,27 +152,27 @@ class KafkaFixture(object):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
- print(" host = %s" % self.host)
- print(" port = %s" % self.port)
- print(" broker_id = %s" % self.broker_id)
- print(" zk_host = %s" % self.zk_host)
- print(" zk_port = %s" % self.zk_port)
- print(" zk_chroot = %s" % self.zk_chroot)
- print(" replicas = %s" % self.replicas)
- print(" partitions = %s" % self.partitions)
- print(" tmp_dir = %s" % self.tmp_dir)
+ logging.info(" host = %s", self.host)
+ logging.info(" port = %s", self.port)
+ logging.info(" broker_id = %s", self.broker_id)
+ logging.info(" zk_host = %s", self.zk_host)
+ logging.info(" zk_port = %s", self.zk_port)
+ logging.info(" zk_chroot = %s", self.zk_chroot)
+ logging.info(" replicas = %s", self.replicas)
+ logging.info(" partitions = %s", self.partitions)
+ logging.info(" tmp_dir = %s", self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs
- template = test_resource("kafka.properties")
+ template = self.test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties")
- render_template(template, properties, vars(self))
+ self.render_template(template, properties, vars(self))
# Configure Kafka child process
- self.child = SpawnedService(kafka_run_class_args(
+ self.child = SpawnedService(self.kafka_run_class_args(
"kafka.Kafka", properties
))
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
@@ -315,13 +180,18 @@ class KafkaFixture(object):
# Party!
self.out("Creating Zookeeper chroot node...")
- proc = subprocess.Popen(kafka_run_class_args(
- "org.apache.zookeeper.ZooKeeperMain",
- "-server", "%s:%d" % (self.zk_host, self.zk_port),
- "create", "/%s" % self.zk_chroot, "kafka-python"
- ))
+ proc = subprocess.Popen(self.kafka_run_class_args(
+ "org.apache.zookeeper.ZooKeeperMain",
+ "-server", "%s:%d" % (self.zk_host, self.zk_port),
+ "create", "/%s" % self.zk_chroot, "kafka-python"
+ ),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
+ self.out(proc.stdout)
+ self.out(proc.stderr)
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
diff --git a/test/service.py b/test/service.py
new file mode 100644
index 0000000..5e6ce61
--- /dev/null
+++ b/test/service.py
@@ -0,0 +1,129 @@
+import re
+import select
+import subprocess
+import sys
+import threading
+import time
+
+__all__ = [
+ 'ExternalService',
+ 'SpawnedService',
+
+]
+
+class ExternalService(object):
+ def __init__(self, host, port):
+ print("Using already running service at %s:%d" % (host, port))
+ self.host = host
+ self.port = port
+
+ def open(self):
+ pass
+
+ def close(self):
+ pass
+
+
+class SpawnedService(threading.Thread):
+ def __init__(self, args=[]):
+ threading.Thread.__init__(self)
+
+ self.args = args
+ self.captured_stdout = ""
+ self.captured_stderr = ""
+ self.stdout_file = None
+ self.stderr_file = None
+ self.capture_stdout = True
+ self.capture_stderr = True
+ self.show_stdout = True
+ self.show_stderr = True
+
+ self.should_die = threading.Event()
+
+ def configure_stdout(self, file=None, capture=True, show=False):
+ self.stdout_file = file
+ self.capture_stdout = capture
+ self.show_stdout = show
+
+ def configure_stderr(self, file=None, capture=False, show=True):
+ self.stderr_file = file
+ self.capture_stderr = capture
+ self.show_stderr = show
+
+ def run(self):
+ stdout_handle = None
+ stderr_handle = None
+ try:
+ if self.stdout_file:
+ stdout_handle = open(self.stdout_file, "w")
+ if self.stderr_file:
+ stderr_handle = open(self.stderr_file, "w")
+ self.run_with_handles(stdout_handle, stderr_handle)
+ finally:
+ if stdout_handle:
+ stdout_handle.close()
+ if stderr_handle:
+ stderr_handle.close()
+
+ def run_with_handles(self, stdout_handle, stderr_handle):
+ child = subprocess.Popen(
+ self.args,
+ bufsize=1,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ alive = True
+
+ while True:
+ (rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
+
+ if child.stdout in rds:
+ line = child.stdout.readline()
+ if stdout_handle:
+ stdout_handle.write(line)
+ stdout_handle.flush()
+ if self.capture_stdout:
+ self.captured_stdout += line
+ if self.show_stdout:
+ sys.stdout.write(line)
+ sys.stdout.flush()
+
+ if child.stderr in rds:
+ line = child.stderr.readline()
+ if stderr_handle:
+ stderr_handle.write(line)
+ stderr_handle.flush()
+ if self.capture_stderr:
+ self.captured_stderr += line
+ if self.show_stderr:
+ sys.stderr.write(line)
+ sys.stderr.flush()
+
+ if self.should_die.is_set():
+ child.terminate()
+ alive = False
+
+ if child.poll() is not None:
+ if not alive:
+ break
+ else:
+ raise RuntimeError("Subprocess has died. Aborting.")
+
+ def wait_for(self, pattern, timeout=10):
+ t1 = time.time()
+ while True:
+ t2 = time.time()
+ if t2 - t1 >= timeout:
+ raise RuntimeError("Waiting for %r timed out" % pattern)
+ if re.search(pattern, self.captured_stdout) is not None:
+ return
+ if re.search(pattern, self.captured_stderr) is not None:
+ return
+ time.sleep(0.1)
+
+ def start(self):
+ threading.Thread.start(self)
+
+ def stop(self):
+ self.should_die.set()
+ self.join()
+
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index e566cce..29a0cd0 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -22,11 +22,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
def test_timeout(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server_socket.bind(('localhost', 14567))
+ server_port = get_open_port()
+ server_socket.bind(('localhost', server_port))
with Timer() as t:
with self.assertRaises((socket.timeout, socket.error)):
- conn = kafka.conn.KafkaConnection("localhost", 14567, 1.0)
+ conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0)
self.assertGreaterEqual(t.interval, 1.0)
def test_consume_none(self):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index e148ad8..eb07d0a 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -1,3 +1,4 @@
+import uuid
import time
import unittest
diff --git a/test/testutil.py b/test/testutil.py
index 2cf62eb..ccb3955 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,13 +1,24 @@
-import uuid
-import time
-import unittest
+import logging
import os
import random
+import socket
import string
-import logging
+import time
+import unittest
+import uuid
+
from kafka.common import OffsetRequest
from kafka import KafkaClient
+__all__ = [
+ 'random_string',
+ 'skip_integration',
+ 'ensure_topic_creation',
+ 'get_open_port',
+ 'KafkaIntegrationTestCase',
+ 'Timer',
+]
+
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
return s
@@ -25,6 +36,13 @@ def ensure_topic_creation(client, topic_name, timeout = 30):
client.load_metadata_for_topics(topic_name)
time.sleep(1)
+def get_open_port():
+ sock = socket.socket()
+ sock.bind(("", 0))
+ port = sock.getsockname()[1]
+ sock.close()
+ return port
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None