diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/__init__.py | 9 | ||||
-rw-r--r-- | test/fixtures.py | 298 | ||||
-rw-r--r-- | test/service.py | 20 | ||||
-rw-r--r-- | test/test_sasl_integration.py | 80 | ||||
-rw-r--r-- | test/testutil.py | 5 |
5 files changed, 323 insertions, 89 deletions
diff --git a/test/__init__.py b/test/__init__.py index 71f667d..329277d 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -2,14 +2,7 @@ from __future__ import absolute_import # Set default logging handler to avoid "No handler found" warnings. import logging -try: # Python 2.7+ - from logging import NullHandler -except ImportError: - class NullHandler(logging.Handler): - def emit(self, record): - pass - -logging.getLogger(__name__).addHandler(NullHandler()) +logging.basicConfig(level=logging.INFO) from kafka.future import Future Future.error_on_callbacks = True # always fail during testing diff --git a/test/fixtures.py b/test/fixtures.py index 557fca6..78cdc5c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -14,6 +14,7 @@ from kafka.vendor.six.moves import urllib, range from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer +from kafka.errors import InvalidReplicationFactorError from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest from test.testutil import env_kafka_version, random_string @@ -140,6 +141,16 @@ class Fixture(object): dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + log.debug("Template string:") + for line in template.splitlines(): + log.debug(' ' + line.strip()) + log.debug("Rendered template:") + with open(target_file.strpath, 'r') as o: + for line in o: + log.debug(' ' + line.strip()) + log.debug("binding:") + for key, value in binding.items(): + log.debug(" {key}={value}".format(key=key, value=value)) def dump_logs(self): self.child.dump_logs() @@ -233,11 +244,14 @@ class ZookeeperFixture(Fixture): class KafkaFixture(Fixture): + broker_user = 'alice' + broker_password = 'alice-secret' + @classmethod def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, transport='PLAINTEXT', replicas=1, partitions=2, - sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None): + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") @@ -261,7 +275,7 @@ class KafkaFixture(Fixture): def __init__(self, host, port, broker_id, zookeeper, zk_chroot, replicas=1, partitions=2, transport='PLAINTEXT', - sasl_mechanism='PLAIN', auto_create_topic=True, + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): super(KafkaFixture, self).__init__() @@ -271,13 +285,18 @@ class KafkaFixture(Fixture): self.broker_id = broker_id self.auto_create_topic = auto_create_topic self.transport = transport.upper() - self.sasl_mechanism = sasl_mechanism.upper() + if sasl_mechanism is not None: + self.sasl_mechanism = sasl_mechanism.upper() + else: + self.sasl_mechanism = None self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs # until then, we need the pattern to work across all supported broker versions # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) + # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism + self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) self.zookeeper = zookeeper self.zk_chroot = zk_chroot @@ -292,6 +311,64 @@ class KafkaFixture(Fixture): self.running = False self._client = None + self.sasl_config = '' + self.jaas_config = '' + + def _sasl_config(self): + if not self.sasl_enabled: + return '' + + sasl_config = "sasl.enabled.mechanisms={mechanism}\n" + sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n" + return sasl_config.format(mechanism=self.sasl_mechanism) + + def _jaas_config(self): + if not self.sasl_enabled: + return '' + + elif self.sasl_mechanism == 'PLAIN': + jaas_config = ( + "org.apache.kafka.common.security.plain.PlainLoginModule required\n" + ' username="{user}" password="{password}" user_{user}="{password}";\n' + ) + elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): + jaas_config = ( + "org.apache.kafka.common.security.scram.ScramLoginModule required\n" + ' username="{user}" password="{password}";\n' + ) + else: + raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) + return jaas_config.format(user=self.broker_user, password=self.broker_password) + + def _add_scram_user(self): + self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) + args = self.kafka_run_class_args( + "kafka.admin.ConfigCommand", + "--zookeeper", + "%s:%d/%s" % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + "--alter", + "--entity-type", "users", + "--entity-name", self.broker_user, + "--add-config", + "{}=[password={}]".format(self.sasl_mechanism, self.broker_password), + ) + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stdout, stderr = proc.communicate() + + if proc.returncode != 0: + self.out("Failed to save credentials to zookeeper!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to save credentials to zookeeper!") + self.out("User created.") + + @property + def sasl_enabled(self): + return self.sasl_mechanism is not None def bootstrap_server(self): return '%s:%d' % (self.host, self.port) @@ -328,9 +405,17 @@ class KafkaFixture(Fixture): def start(self): # Configure Kafka child process properties = self.tmp_dir.join("kafka.properties") - template = self.test_resource("kafka.properties") + jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") + properties_template = self.test_resource("kafka.properties") + jaas_conf_template = self.test_resource("kafka_server_jaas.conf") + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() + if self.sasl_enabled: + opts = env.get('KAFKA_OPTS', '').strip() + opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath) + env['KAFKA_OPTS'] = opts + self.render_template(jaas_conf_template, jaas_conf, vars(self)) timeout = 5 max_timeout = 120 @@ -345,14 +430,17 @@ class KafkaFixture(Fixture): if auto_port: self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) - self.render_template(template, properties, vars(self)) + self.render_template(properties_template, properties, vars(self)) + self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(self.start_pattern, timeout=timeout): + if self._broker_ready(timeout) and self._scram_user_present(timeout): break + self.child.dump_logs() self.child.stop() + timeout *= 2 time.sleep(backoff) tries += 1 @@ -360,11 +448,20 @@ class KafkaFixture(Fixture): else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') - (self._client,) = self.get_clients(1, '_internal_client') + (self._client,) = self.get_clients(1, client_id='_internal_client') self.out("Done!") self.running = True + def _broker_ready(self, timeout): + return self.child.wait_for(self.start_pattern, timeout=timeout) + + def _scram_user_present(self, timeout): + # no need to wait for scram user if scram is not used + if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'): + return True + return self.child.wait_for(self.scram_pattern, timeout=timeout) + def open(self): if self.running: self.out("Instance already running") @@ -378,18 +475,24 @@ class KafkaFixture(Fixture): self.tmp_dir.ensure('data', dir=True) self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zookeeper.host) - log.info(" zk_port = %s", self.zookeeper.port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir.strpath) + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" sasl_mechanism = %s", self.sasl_mechanism) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) self._create_zk_chroot() + self.sasl_config = self._sasl_config() + self.jaas_config = self._jaas_config() + # add user to zookeeper for the first server + if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: + self._add_scram_user() self.start() atexit.register(self.close) @@ -437,7 +540,8 @@ class KafkaFixture(Fixture): future = self._client.send(node_id, request) future.error_on_callbacks = True future.add_errback(_failure) - return self._client.poll(future=future, timeout_ms=timeout) + self._client.poll(future=future, timeout_ms=timeout) + return future.value except Exception as exc: time.sleep(1) retries -= 1 @@ -446,80 +550,122 @@ class KafkaFixture(Fixture): else: pass # retry - def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000): if num_partitions is None: num_partitions = self.partitions if replication_factor is None: replication_factor = self.replicas # Try different methods to create a topic, from the fastest to the slowest - if self.auto_create_topic and \ - num_partitions == self.partitions and \ - replication_factor == self.replicas: - self._send_request(MetadataRequest[0]([topic_name])) + if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas: + self._create_topic_via_metadata(topic_name, timeout_ms) elif env_kafka_version() >= (0, 10, 1, 0): - request = CreateTopicsRequest[0]([(topic_name, num_partitions, - replication_factor, [], [])], timeout_ms) - result = self._send_request(request, timeout=timeout_ms) - for topic_result in result[0].topic_error_codes: - error_code = topic_result[1] - if error_code != 0: - raise errors.for_code(error_code) + try: + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + except InvalidReplicationFactorError: + # wait and try again + # on travis the brokers sometimes take a while to find themselves + time.sleep(0.5) + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) else: - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--create', - '--topic', topic_name, - '--partitions', self.partitions \ - if num_partitions is None else num_partitions, - '--replication-factor', self.replicas \ - if replication_factor is None \ - else replication_factor) - if env_kafka_version() >= (0, 10): - args.append('--if-not-exists') - env = self.kafka_run_class_env() - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - if 'kafka.common.TopicExistsException' not in stdout: - self.out("Failed to create topic %s" % (topic_name,)) - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to create topic %s" % (topic_name,)) + self._create_topic_via_cli(topic_name, num_partitions, replication_factor) + + def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): + self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) + + def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + response = self._send_request(request, timeout=timeout_ms) + for topic_result in response.topic_errors: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + + def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if env_kafka_version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + if 'kafka.common.TopicExistsException' not in stdout: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) + + def get_topic_names(self): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--list' + ) + env = self.kafka_run_class_env() + env.pop('KAFKA_LOG4J_OPTS') + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + self.out("Failed to list topics!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to list topics!") + return stdout.decode().splitlines(False) def create_topics(self, topic_names, num_partitions=None, replication_factor=None): for topic_name in topic_names: self._create_topic(topic_name, num_partitions, replication_factor) - def get_clients(self, cnt=1, client_id=None): - if client_id is None: - client_id = 'client' - return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), - bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) - - def get_admin_clients(self, cnt=1, **params): - params.setdefault('client_id', 'admin_client') - params['bootstrap_servers'] = self.bootstrap_server() + def _enrich_client_params(self, params, **defaults): + params = params.copy() + for key, value in defaults.items(): + params.setdefault(key, value) + params.setdefault('bootstrap_servers', self.bootstrap_server()) + if self.sasl_enabled: + params.setdefault('sasl_mechanism', self.sasl_mechanism) + params.setdefault('security_protocol', self.transport) + if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): + params.setdefault('sasl_plain_username', self.broker_user) + params.setdefault('sasl_plain_password', self.broker_password) + return params + + @staticmethod + def _create_many_clients(cnt, cls, *args, **params): client_id = params['client_id'] - for x in range(cnt): + for _ in range(cnt): params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaAdminClient(**params) + yield cls(*args, **params) + + def get_clients(self, cnt=1, **params): + params = self._enrich_client_params(params, client_id='client') + for client in self._create_many_clients(cnt, KafkaClient, **params): + yield client + + def get_admin_clients(self, cnt, **params): + params = self._enrich_client_params(params, client_id='admin_client') + for client in self._create_many_clients(cnt, KafkaAdminClient, **params): + yield client def get_consumers(self, cnt, topics, **params): - params.setdefault('client_id', 'consumer') - params.setdefault('heartbeat_interval_ms', 500) - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for x in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaConsumer(*topics, **params) + params = self._enrich_client_params( + params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest' + ) + for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params): + yield client def get_producers(self, cnt, **params): - params.setdefault('client_id', 'producer') - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for x in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaProducer(**params) + params = self._enrich_client_params(params, client_id='producer') + for client in self._create_many_clients(cnt, KafkaProducer, **params): + yield client diff --git a/test/service.py b/test/service.py index 47fb846..045d780 100644 --- a/test/service.py +++ b/test/service.py @@ -45,6 +45,11 @@ class SpawnedService(threading.Thread): self.child = None self.alive = False self.daemon = True + log.info("Created service for command:") + log.info(" "+' '.join(self.args)) + log.debug("With environment:") + for key, value in self.env.items(): + log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return @@ -57,7 +62,7 @@ class SpawnedService(threading.Thread): bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.alive = True + self.alive = self.child.poll() is None def _despawn(self): if self.child.poll() is None: @@ -83,12 +88,14 @@ class SpawnedService(threading.Thread): raise if self.child.stdout in rds: - line = self.child.stdout.readline() - self.captured_stdout.append(line.decode('utf-8').rstrip()) + line = self.child.stdout.readline().decode('utf-8').rstrip() + if line: + self.captured_stdout.append(line) if self.child.stderr in rds: - line = self.child.stderr.readline() - self.captured_stderr.append(line.decode('utf-8').rstrip()) + line = self.child.stderr.readline().decode('utf-8').rstrip() + if line: + self.captured_stderr.append(line) if self.child.poll() is not None: self.dump_logs() @@ -105,6 +112,9 @@ class SpawnedService(threading.Thread): def wait_for(self, pattern, timeout=30): start = time.time() while True: + if not self.is_alive(): + raise RuntimeError("Child thread died already.") + elapsed = time.time() - start if elapsed >= timeout: log.error("Waiting for %r timed out after %d seconds", pattern, timeout) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py new file mode 100644 index 0000000..e3a4813 --- /dev/null +++ b/test/test_sasl_integration.py @@ -0,0 +1,80 @@ +import logging +import uuid + +import pytest + +from kafka.admin import NewTopic +from kafka.protocol.metadata import MetadataRequest_v1 +from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore + + +@pytest.fixture( + params=[ + pytest.param( + "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") + ), + pytest.param( + "SCRAM-SHA-256", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + pytest.param( + "SCRAM-SHA-512", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + ] +) +def sasl_kafka(request, kafka_broker_factory): + sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] + yield sasl_kafka + sasl_kafka.child.dump_logs() + + +def test_admin(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + admin, = sasl_kafka.get_admin_clients(1) + admin.create_topics([NewTopic(topic_name, 1, 1)]) + assert topic_name in sasl_kafka.get_topic_names() + + +def test_produce_and_consume(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=2) + producer, = sasl_kafka.get_producers(1) + + messages_and_futures = [] # [(message, produce_future),] + for i in range(100): + encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8") + future = producer.send(topic_name, value=encoded_msg, partition=i % 2) + messages_and_futures.append((encoded_msg, future)) + producer.flush() + + for (msg, f) in messages_and_futures: + assert f.succeeded() + + consumer, = sasl_kafka.get_consumers(1, [topic_name]) + messages = {0: [], 1: []} + for i, message in enumerate(consumer, 1): + logging.debug("Consumed message %s", repr(message)) + messages[message.partition].append(message) + if i >= 100: + break + + assert_message_count(messages[0], 50) + assert_message_count(messages[1], 50) + + +def test_client(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=1) + + client, = sasl_kafka.get_clients(1) + request = MetadataRequest_v1(None) + client.send(0, request) + for _ in range(10): + result = client.poll(timeout_ms=10000) + if len(result) > 0: + break + else: + raise RuntimeError("Couldn't fetch topic response from Broker.") + result = result[0] + assert topic_name in [t[1] for t in result.topics] diff --git a/test/testutil.py b/test/testutil.py index 77a6673..ec4d70b 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -2,10 +2,15 @@ from __future__ import absolute_import import os import random +import re import string import time +def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')): + return _matcher.sub('_', string) + + def random_string(length): return "".join(random.choice(string.ascii_letters) for i in range(length)) |