summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pylint.rc1
-rw-r--r--test/conftest.py113
-rw-r--r--test/fixtures.py299
-rw-r--r--test/test_client_integration.py2
-rw-r--r--test/test_consumer_integration.py45
-rw-r--r--test/test_failover_integration.py3
-rw-r--r--test/test_producer_integration.py64
-rw-r--r--test/testutil.py89
-rw-r--r--tox.ini1
9 files changed, 460 insertions, 157 deletions
diff --git a/pylint.rc b/pylint.rc
index d13ef51..d22e523 100644
--- a/pylint.rc
+++ b/pylint.rc
@@ -1,5 +1,6 @@
[TYPECHECK]
ignored-classes=SyncManager,_socketobject
+generated-members=py.*
[MESSAGES CONTROL]
disable=E1129
diff --git a/test/conftest.py b/test/conftest.py
index e85b977..d53ff23 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,38 +1,117 @@
from __future__ import absolute_import
-import os
+import inspect
import pytest
+from decorator import decorate
from test.fixtures import KafkaFixture, ZookeeperFixture
-
+from test.testutil import kafka_version, random_string
@pytest.fixture(scope="module")
def version():
- if 'KAFKA_VERSION' not in os.environ:
- return ()
- return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
-
+ """Return the Kafka version set in the OS environment"""
+ return kafka_version()
@pytest.fixture(scope="module")
-def zookeeper(version, request):
- assert version
- zk = ZookeeperFixture.instance()
- yield zk
- zk.close()
+def zookeeper():
+ """Return a Zookeeper fixture"""
+ zk_instance = ZookeeperFixture.instance()
+ yield zk_instance
+ zk_instance.close()
+@pytest.fixture(scope="module")
+def kafka_broker(kafka_broker_factory):
+ """Return a Kafka broker fixture"""
+ return kafka_broker_factory()[0]
@pytest.fixture(scope="module")
-def kafka_broker(version, zookeeper, request):
- assert version
- k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
- partitions=4)
- yield k
- k.close()
+def kafka_broker_factory(version, zookeeper):
+ """Return a Kafka broker fixture factory"""
+ assert version, 'KAFKA_VERSION must be specified to run integration tests'
+
+ _brokers = []
+ def factory(**broker_params):
+ params = {} if broker_params is None else broker_params.copy()
+ params.setdefault('partitions', 4)
+ num_brokers = params.pop('num_brokers', 1)
+ brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
+ for x in range(num_brokers))
+ _brokers.extend(brokers)
+ return brokers
+ yield factory
+
+ for broker in _brokers:
+ broker.close()
+
+@pytest.fixture
+def simple_client(kafka_broker, request, topic):
+ """Return a SimpleClient fixture"""
+ client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
+ client.ensure_topic_exists(topic)
+ yield client
+ client.close()
+
+@pytest.fixture
+def kafka_client(kafka_broker, request):
+ """Return a KafkaClient fixture"""
+ (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
+ yield client
+ client.close()
+
+@pytest.fixture
+def kafka_consumer(kafka_consumer_factory):
+ """Return a KafkaConsumer fixture"""
+ return kafka_consumer_factory()
+
+@pytest.fixture
+def kafka_consumer_factory(kafka_broker, topic, request):
+ """Return a KafkaConsumer factory fixture"""
+ _consumer = [None]
+
+ def factory(**kafka_consumer_params):
+ params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
+ params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
+ _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
+ return _consumer[0]
+
+ yield factory
+
+ if _consumer[0]:
+ _consumer[0].close()
+
+@pytest.fixture
+def kafka_producer(kafka_producer_factory):
+ """Return a KafkaProducer fixture"""
+ yield kafka_producer_factory()
+
+@pytest.fixture
+def kafka_producer_factory(kafka_broker, request):
+ """Return a KafkaProduce factory fixture"""
+ _producer = [None]
+
+ def factory(**kafka_producer_params):
+ params = {} if kafka_producer_params is None else kafka_producer_params.copy()
+ params.setdefault('client_id', 'producer_%s' % (request.node.name,))
+ _producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
+ return _producer[0]
+
+ yield factory
+
+ if _producer[0]:
+ _producer[0].close()
+
+@pytest.fixture
+def topic(kafka_broker, request):
+ """Return a topic fixture"""
+ topic_name = '%s_%s' % (request.node.name, random_string(10))
+ kafka_broker.create_topics([topic_name])
+ return topic_name
@pytest.fixture
def conn(mocker):
+ """Return a connection mocker fixture"""
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
diff --git a/test/fixtures.py b/test/fixtures.py
index 1c418fd..493a664 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -4,29 +4,55 @@ import atexit
import logging
import os
import os.path
-import shutil
+import random
+import socket
+import string
import subprocess
-import tempfile
import time
import uuid
-from six.moves import urllib
+import py
+from six.moves import urllib, xrange
from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
+from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
+from kafka.client_async import KafkaClient
+from kafka.protocol.admin import CreateTopicsRequest
+from kafka.protocol.metadata import MetadataRequest
from test.service import ExternalService, SpawnedService
-from test.testutil import get_open_port
-
log = logging.getLogger(__name__)
+def random_string(length):
+ return "".join(random.choice(string.ascii_letters) for i in xrange(length))
+
+def version_str_to_list(version_str):
+ return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1)
+
+def version():
+ if 'KAFKA_VERSION' not in os.environ:
+ return ()
+ return version_str_to_list(os.environ['KAFKA_VERSION'])
+
+def get_open_port():
+ sock = socket.socket()
+ sock.bind(("", 0))
+ port = sock.getsockname()[1]
+ sock.close()
+ return port
class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2')
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
- project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
- kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
+ project_root = os.environ.get('PROJECT_ROOT',
+ os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+ kafka_root = os.environ.get("KAFKA_ROOT",
+ os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
+ def __init__(self):
+ self.child = None
+
@classmethod
def download_official_distribution(cls,
kafka_version=None,
@@ -71,31 +97,34 @@ class Fixture(object):
@classmethod
def kafka_run_class_args(cls, *args):
result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
- result.extend(args)
+ result.extend([str(arg) for arg in args])
return result
def kafka_run_class_env(self):
env = os.environ.copy()
- env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties")
+ env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \
+ self.test_resource("log4j.properties")
return env
@classmethod
def render_template(cls, source_file, target_file, binding):
- log.info('Rendering %s from template %s', target_file, source_file)
+ log.info('Rendering %s from template %s', target_file.strpath, source_file)
with open(source_file, "r") as handle:
template = handle.read()
assert len(template) > 0, 'Empty template %s' % source_file
- with open(target_file, "w") as handle:
+ with open(target_file.strpath, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
os.fsync(handle)
# fsync directory for durability
# https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/
- dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY)
+ dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
+ def dump_logs(self):
+ self.child.dump_logs()
class ZookeeperFixture(Fixture):
@classmethod
@@ -111,32 +140,36 @@ class ZookeeperFixture(Fixture):
fixture.open()
return fixture
- def __init__(self, host, port):
+ def __init__(self, host, port, tmp_dir=None):
+ super(ZookeeperFixture, self).__init__()
self.host = host
self.port = port
- self.tmp_dir = None
- self.child = None
+ self.tmp_dir = tmp_dir
def kafka_run_class_env(self):
env = super(ZookeeperFixture, self).kafka_run_class_env()
- env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
+ env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env
def out(self, message):
log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message)
def open(self):
- self.tmp_dir = tempfile.mkdtemp()
+ if self.tmp_dir is None:
+ self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
+ self.tmp_dir.ensure(dir=True)
+
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port or '(auto)')
- log.info(" tmp_dir = %s", self.tmp_dir)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
# Configure Zookeeper child process
template = self.test_resource("zookeeper.properties")
- properties = os.path.join(self.tmp_dir, "zookeeper.properties")
- args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
+ properties = self.tmp_dir.join("zookeeper.properties")
+ args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain",
+ properties.strpath)
env = self.kafka_run_class_env()
# Party!
@@ -174,7 +207,7 @@ class ZookeeperFixture(Fixture):
self.child.stop()
self.child = None
self.out("Done!")
- shutil.rmtree(self.tmp_dir)
+ self.tmp_dir.remove()
def __del__(self):
self.close()
@@ -182,9 +215,11 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
@classmethod
- def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None,
+ def instance(cls, broker_id, zookeeper, zk_chroot=None,
host=None, port=None,
- transport='PLAINTEXT', replicas=1, partitions=2):
+ transport='PLAINTEXT', replicas=1, partitions=2,
+ sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None):
+
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -195,19 +230,29 @@ class KafkaFixture(Fixture):
if host is None:
host = "localhost"
fixture = KafkaFixture(host, port, broker_id,
- zk_host, zk_port, zk_chroot,
+ zookeeper, zk_chroot,
transport=transport,
- replicas=replicas, partitions=partitions)
+ replicas=replicas, partitions=partitions,
+ sasl_mechanism=sasl_mechanism,
+ auto_create_topic=auto_create_topic,
+ tmp_dir=tmp_dir)
+
fixture.open()
return fixture
- def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot,
- replicas=1, partitions=2, transport='PLAINTEXT'):
+ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
+ replicas=1, partitions=2, transport='PLAINTEXT',
+ sasl_mechanism='PLAIN', auto_create_topic=True,
+ tmp_dir=None):
+ super(KafkaFixture, self).__init__()
+
self.host = host
self.port = port
self.broker_id = broker_id
+ self.auto_create_topic = auto_create_topic
self.transport = transport.upper()
+ self.sasl_mechanism = sasl_mechanism.upper()
self.ssl_dir = self.test_resource('ssl')
# TODO: checking for port connection would be better than scanning logs
@@ -215,67 +260,55 @@ class KafkaFixture(Fixture):
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id
- self.zk_host = zk_host
- self.zk_port = zk_port
+ self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
+ # Add the attributes below for the template binding
+ self.zk_host = self.zookeeper.host
+ self.zk_port = self.zookeeper.port
self.replicas = replicas
self.partitions = partitions
- self.tmp_dir = None
- self.child = None
+ self.tmp_dir = tmp_dir
self.running = False
+ self._client = None
+
+ def bootstrap_server(self):
+ return '%s:%d' % (self.host, self.port)
+
def kafka_run_class_env(self):
env = super(KafkaFixture, self).kafka_run_class_env()
- env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
+ env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env
def out(self, message):
log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message)
- def open(self):
- if self.running:
- self.out("Instance already running")
- return
-
- self.tmp_dir = tempfile.mkdtemp()
- self.out("Running local instance...")
- log.info(" host = %s", self.host)
- log.info(" port = %s", self.port or '(auto)')
- log.info(" transport = %s", self.transport)
- log.info(" broker_id = %s", self.broker_id)
- log.info(" zk_host = %s", self.zk_host)
- log.info(" zk_port = %s", self.zk_port)
- log.info(" zk_chroot = %s", self.zk_chroot)
- log.info(" replicas = %s", self.replicas)
- log.info(" partitions = %s", self.partitions)
- log.info(" tmp_dir = %s", self.tmp_dir)
-
- # Create directories
- os.mkdir(os.path.join(self.tmp_dir, "logs"))
- os.mkdir(os.path.join(self.tmp_dir, "data"))
-
+ def _create_zk_chroot(self):
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
- "-server", "%s:%d" % (self.zk_host, self.zk_port),
+ "-server",
+ "%s:%d" % (self.zookeeper.host,
+ self.zookeeper.port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if proc.wait() != 0:
+ if proc.wait() != 0 or proc.returncode != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
- self.out("Done!")
+ self.out("Kafka chroot created in Zookeeper!")
+ def start(self):
# Configure Kafka child process
- properties = os.path.join(self.tmp_dir, "kafka.properties")
+ properties = self.tmp_dir.join("kafka.properties")
template = self.test_resource("kafka.properties")
- args = self.kafka_run_class_args("kafka.Kafka", properties)
+ args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
timeout = 5
@@ -305,14 +338,45 @@ class KafkaFixture(Fixture):
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
+
+ (self._client,) = self.get_clients(1, '_internal_client')
+
self.out("Done!")
self.running = True
+
+ def open(self):
+ if self.running:
+ self.out("Instance already running")
+ return
+
+ # Create directories
+ if self.tmp_dir is None:
+ self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
+ self.tmp_dir.ensure(dir=True)
+ self.tmp_dir.ensure('logs', dir=True)
+ self.tmp_dir.ensure('data', dir=True)
+
+ self.out("Running local instance...")
+ log.info(" host = %s", self.host)
+ log.info(" port = %s", self.port or '(auto)')
+ log.info(" transport = %s", self.transport)
+ log.info(" broker_id = %s", self.broker_id)
+ log.info(" zk_host = %s", self.zookeeper.host)
+ log.info(" zk_port = %s", self.zookeeper.port)
+ log.info(" zk_chroot = %s", self.zk_chroot)
+ log.info(" replicas = %s", self.replicas)
+ log.info(" partitions = %s", self.partitions)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
+
+ self._create_zk_chroot()
+ self.start()
+
atexit.register(self.close)
def __del__(self):
self.close()
- def close(self):
+ def stop(self):
if not self.running:
self.out("Instance already stopped")
return
@@ -320,6 +384,117 @@ class KafkaFixture(Fixture):
self.out("Stopping...")
self.child.stop()
self.child = None
- self.out("Done!")
- shutil.rmtree(self.tmp_dir)
self.running = False
+ self.out("Stopped!")
+
+ def close(self):
+ self.stop()
+ if self.tmp_dir is not None:
+ self.tmp_dir.remove()
+ self.tmp_dir = None
+ self.out("Done!")
+
+ def dump_logs(self):
+ super(KafkaFixture, self).dump_logs()
+ self.zookeeper.dump_logs()
+
+ def _send_request(self, request, timeout=None):
+ def _failure(error):
+ raise error
+ retries = 10
+ while True:
+ node_id = self._client.least_loaded_node()
+ for ready_retry in range(40):
+ if self._client.ready(node_id, False):
+ break
+ time.sleep(.1)
+ else:
+ raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))
+
+ try:
+ future = self._client.send(node_id, request)
+ future.error_on_callbacks = True
+ future.add_errback(_failure)
+ return self._client.poll(future=future, timeout_ms=timeout)
+ except Exception as exc:
+ time.sleep(1)
+ retries -= 1
+ if retries == 0:
+ raise exc
+ else:
+ pass # retry
+
+ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ if num_partitions is None:
+ num_partitions = self.partitions
+ if replication_factor is None:
+ replication_factor = self.replicas
+
+ # Try different methods to create a topic, from the fastest to the slowest
+ if self.auto_create_topic and \
+ num_partitions == self.partitions and \
+ replication_factor == self.replicas:
+ self._send_request(MetadataRequest[0]([topic_name]))
+ elif version() >= (0, 10, 1, 0):
+ request = CreateTopicsRequest[0]([(topic_name, num_partitions,
+ replication_factor, [], [])], timeout_ms)
+ result = self._send_request(request, timeout=timeout_ms)
+ for topic_result in result[0].topic_error_codes:
+ error_code = topic_result[1]
+ if error_code != 0:
+ raise errors.for_code(error_code)
+ else:
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--create',
+ '--topic', topic_name,
+ '--partitions', self.partitions \
+ if num_partitions is None else num_partitions,
+ '--replication-factor', self.replicas \
+ if replication_factor is None \
+ else replication_factor)
+ if version() >= (0, 10):
+ args.append('--if-not-exists')
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ ret = proc.wait()
+ if ret != 0 or proc.returncode != 0:
+ output = proc.stdout.read()
+ if not 'kafka.common.TopicExistsException' in output:
+ self.out("Failed to create topic %s" % (topic_name,))
+ self.out(output)
+ self.out(proc.stderr.read())
+ raise RuntimeError("Failed to create topic %s" % (topic_name,))
+
+ def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
+ for topic_name in topic_names:
+ self._create_topic(topic_name, num_partitions, replication_factor)
+
+ def get_clients(self, cnt=1, client_id=None):
+ if client_id is None:
+ client_id = 'client'
+ return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
+ bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
+
+ def get_consumers(self, cnt, topics, **params):
+ params.setdefault('client_id', 'consumer')
+ params.setdefault('heartbeat_interval_ms', 500)
+ params['bootstrap_servers'] = self.bootstrap_server()
+ client_id = params['client_id']
+ for x in range(cnt):
+ params['client_id'] = '%s_%s' % (client_id, random_string(4))
+ yield KafkaConsumer(*topics, **params)
+
+ def get_producers(self, cnt, **params):
+ params.setdefault('client_id', 'producer')
+ params['bootstrap_servers'] = self.bootstrap_server()
+ client_id = params['client_id']
+ for x in range(cnt):
+ params['client_id'] = '%s_%s' % (client_id, random_string(4))
+ yield KafkaProducer(**params)
+
+ def get_simple_client(self, **params):
+ params.setdefault('client_id', 'simple_client')
+ return SimpleClient(self.bootstrap_server(), **params)
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 742572d..df0faef 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -17,7 +17,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
return
cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+ cls.server = KafkaFixture.instance(0, cls.zk)
@classmethod
def tearDownClass(cls): # noqa
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 40eec14..fe4e454 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -21,9 +21,30 @@ from kafka.structs import (
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, random_string, Timer
+ KafkaIntegrationTestCase, kafka_versions, random_string, Timer,
+ send_messages
)
+def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
+ """Test KafkaConsumer
+ """
+ kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+
+ send_messages(simple_client, topic, 0, range(0, 100))
+ send_messages(simple_client, topic, 1, range(100, 200))
+
+ cnt = 0
+ messages = {0: set(), 1: set()}
+ for message in kafka_consumer:
+ logging.debug("Consumed message %s", repr(message))
+ cnt += 1
+ messages[message.partition].add(message.offset)
+ if cnt >= 200:
+ break
+
+ assert len(messages[0]) == 100
+ assert len(messages[1]) == 100
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None
@@ -35,9 +56,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
cls.zk = ZookeeperFixture.instance()
chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port,
+ cls.server1 = KafkaFixture.instance(0, cls.zk,
zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port,
+ cls.server2 = KafkaFixture.instance(1, cls.zk,
zk_chroot=chroot)
cls.server = cls.server1 # Bootstrapping server
@@ -501,24 +522,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- def test_kafka_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.kafka_consumer(auto_offset_reset='earliest')
- n = 0
- messages = {0: set(), 1: set()}
- for m in consumer:
- logging.debug("Consumed message %s" % repr(m))
- n += 1
- messages[m.partition].add(m.offset)
- if n >= 200:
- break
-
- self.assertEqual(len(messages[0]), 100)
- self.assertEqual(len(messages[1]), 100)
-
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 9141947..8531cfb 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -29,10 +29,9 @@ class TestFailover(KafkaIntegrationTestCase):
# mini zookeeper, 3 kafka brokers
self.zk = ZookeeperFixture.instance()
- kk_args = [self.zk.host, self.zk.port]
kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
'partitions': partitions}
- self.brokers = [KafkaFixture.instance(i, *kk_args, **kk_kwargs)
+ self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs)
for i in range(replicas)]
hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index a304e83..ca0da6a 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -15,7 +15,50 @@ from kafka.producer.base import Producer
from kafka.structs import FetchRequestPayload, ProduceRequestPayload
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, kafka_versions
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset
+
+# TODO: This duplicates a TestKafkaProducerIntegration method temporarily
+# while the migration to pytest is in progress
+def assert_produce_request(client, topic, messages, initial_offset, message_ct,
+ partition=0):
+ """Verify the correctness of a produce request
+ """
+ produce = ProduceRequestPayload(topic, partition, messages=messages)
+
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+ resp = client.send_produce_request([produce])
+ assert_produce_response(resp, initial_offset)
+
+ assert current_offset(client, topic, partition) == initial_offset + message_ct
+
+def assert_produce_response(resp, initial_offset):
+ """Verify that a produce response is well-formed
+ """
+ assert len(resp) == 1
+ assert resp[0].error == 0
+ assert resp[0].offset == initial_offset
+
+def test_produce_many_simple(simple_client, topic):
+ """Test multiple produces using the SimpleClient
+ """
+ start_offset = current_offset(simple_client, topic, 0)
+
+ assert_produce_request(
+ simple_client, topic,
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
+ start_offset,
+ 100,
+ )
+
+ assert_produce_request(
+ simple_client, topic,
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
+ start_offset+100,
+ 100,
+ )
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@@ -26,7 +69,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
return
cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+ cls.server = KafkaFixture.instance(0, cls.zk)
@classmethod
def tearDownClass(cls): # noqa
@@ -36,23 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- def test_produce_many_simple(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset,
- 100,
- )
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset+100,
- 100,
- )
-
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
diff --git a/test/testutil.py b/test/testutil.py
index 0ec1cff..850e925 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,36 +1,20 @@
-import functools
-import logging
import operator
import os
-import random
import socket
-import string
import time
import uuid
-from six.moves import xrange
+import decorator
+import pytest
from . import unittest
-from kafka import SimpleClient
+from kafka import SimpleClient, create_message
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
-from kafka.structs import OffsetRequestPayload
-
-__all__ = [
- 'random_string',
- 'get_open_port',
- 'kafka_versions',
- 'KafkaIntegrationTestCase',
- 'Timer',
-]
-
-def random_string(l):
- return "".join(random.choice(string.ascii_letters) for i in xrange(l))
+from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
+from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
def kafka_versions(*versions):
- def version_str_to_list(s):
- return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1]
-
def construct_lambda(s):
if s[0].isdigit():
op_str = '='
@@ -54,25 +38,25 @@ def kafka_versions(*versions):
}
op = op_map[op_str]
version = version_str_to_list(v_str)
- return lambda a: op(version_str_to_list(a), version)
+ return lambda a: op(a, version)
validators = map(construct_lambda, versions)
- def kafka_versions(func):
- @functools.wraps(func)
- def wrapper(self):
- kafka_version = os.environ.get('KAFKA_VERSION')
+ def real_kafka_versions(func):
+ def wrapper(func, *args, **kwargs):
+ version = kafka_version()
- if not kafka_version:
- self.skipTest("no kafka version set in KAFKA_VERSION env var")
+ if not version:
+ pytest.skip("no kafka version set in KAFKA_VERSION env var")
for f in validators:
- if not f(kafka_version):
- self.skipTest("unsupported kafka version")
+ if not f(version):
+ pytest.skip("unsupported kafka version")
- return func(self)
- return wrapper
- return kafka_versions
+ return func(*args, **kwargs)
+ return decorator.decorator(wrapper, func)
+
+ return real_kafka_versions
def get_open_port():
sock = socket.socket()
@@ -81,6 +65,40 @@ def get_open_port():
sock.close()
return port
+_MESSAGES = {}
+def msg(message):
+ """Format, encode and deduplicate a message
+ """
+ global _MESSAGES #pylint: disable=global-statement
+ if message not in _MESSAGES:
+ _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))
+
+ return _MESSAGES[message].encode('utf-8')
+
+def send_messages(client, topic, partition, messages):
+ """Send messages to a topic's partition
+ """
+ messages = [create_message(msg(str(m))) for m in messages]
+ produce = ProduceRequestPayload(topic, partition, messages=messages)
+ resp, = client.send_produce_request([produce])
+ assert resp.error == 0
+
+ return [x.value for x in messages]
+
+def current_offset(client, topic, partition, kafka_broker=None):
+ """Get the current offset of a topic's partition
+ """
+ try:
+ offsets, = client.send_offset_request([OffsetRequestPayload(topic,
+ partition, -1, 1)])
+ except Exception:
+ # XXX: We've seen some UnknownErrors here and can't debug w/o server logs
+ if kafka_broker:
+ kafka_broker.dump_logs()
+ raise
+ else:
+ return offsets.offsets[0]
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
@@ -122,7 +140,8 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def current_offset(self, topic, partition):
try:
- offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)])
+ offsets, = self.client.send_offset_request([OffsetRequestPayload(topic,
+ partition, -1, 1)])
except Exception:
# XXX: We've seen some UnknownErrors here and can't debug w/o server logs
self.zk.child.dump_logs()
@@ -132,7 +151,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
return offsets.offsets[0]
def msgs(self, iterable):
- return [ self.msg(x) for x in iterable ]
+ return [self.msg(x) for x in iterable]
def msg(self, s):
if s not in self._messages:
diff --git a/tox.ini b/tox.ini
index 35dc842..ad95f93 100644
--- a/tox.ini
+++ b/tox.ini
@@ -20,6 +20,7 @@ deps =
xxhash
crc32c
py26: unittest2
+ decorator
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =