summaryrefslogtreecommitdiff
path: root/test/fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/fixtures.py')
-rw-r--r--test/fixtures.py298
1 files changed, 222 insertions, 76 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index 557fca6..78cdc5c 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -14,6 +14,7 @@ from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
+from kafka.errors import InvalidReplicationFactorError
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
from test.testutil import env_kafka_version, random_string
@@ -140,6 +141,16 @@ class Fixture(object):
dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
+ log.debug("Template string:")
+ for line in template.splitlines():
+ log.debug(' ' + line.strip())
+ log.debug("Rendered template:")
+ with open(target_file.strpath, 'r') as o:
+ for line in o:
+ log.debug(' ' + line.strip())
+ log.debug("binding:")
+ for key, value in binding.items():
+ log.debug(" {key}={value}".format(key=key, value=value))
def dump_logs(self):
self.child.dump_logs()
@@ -233,11 +244,14 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
+ broker_user = 'alice'
+ broker_password = 'alice-secret'
+
@classmethod
def instance(cls, broker_id, zookeeper, zk_chroot=None,
host=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2,
- sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None):
+ sasl_mechanism=None, auto_create_topic=True, tmp_dir=None):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
@@ -261,7 +275,7 @@ class KafkaFixture(Fixture):
def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT',
- sasl_mechanism='PLAIN', auto_create_topic=True,
+ sasl_mechanism=None, auto_create_topic=True,
tmp_dir=None):
super(KafkaFixture, self).__init__()
@@ -271,13 +285,18 @@ class KafkaFixture(Fixture):
self.broker_id = broker_id
self.auto_create_topic = auto_create_topic
self.transport = transport.upper()
- self.sasl_mechanism = sasl_mechanism.upper()
+ if sasl_mechanism is not None:
+ self.sasl_mechanism = sasl_mechanism.upper()
+ else:
+ self.sasl_mechanism = None
self.ssl_dir = self.test_resource('ssl')
# TODO: checking for port connection would be better than scanning logs
# until then, we need the pattern to work across all supported broker versions
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)
+ # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism
+ self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user)
self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
@@ -292,6 +311,64 @@ class KafkaFixture(Fixture):
self.running = False
self._client = None
+ self.sasl_config = ''
+ self.jaas_config = ''
+
+ def _sasl_config(self):
+ if not self.sasl_enabled:
+ return ''
+
+ sasl_config = "sasl.enabled.mechanisms={mechanism}\n"
+ sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n"
+ return sasl_config.format(mechanism=self.sasl_mechanism)
+
+ def _jaas_config(self):
+ if not self.sasl_enabled:
+ return ''
+
+ elif self.sasl_mechanism == 'PLAIN':
+ jaas_config = (
+ "org.apache.kafka.common.security.plain.PlainLoginModule required\n"
+ ' username="{user}" password="{password}" user_{user}="{password}";\n'
+ )
+ elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"):
+ jaas_config = (
+ "org.apache.kafka.common.security.scram.ScramLoginModule required\n"
+ ' username="{user}" password="{password}";\n'
+ )
+ else:
+ raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism))
+ return jaas_config.format(user=self.broker_user, password=self.broker_password)
+
+ def _add_scram_user(self):
+ self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user))
+ args = self.kafka_run_class_args(
+ "kafka.admin.ConfigCommand",
+ "--zookeeper",
+ "%s:%d/%s" % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ "--alter",
+ "--entity-type", "users",
+ "--entity-name", self.broker_user,
+ "--add-config",
+ "{}=[password={}]".format(self.sasl_mechanism, self.broker_password),
+ )
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ stdout, stderr = proc.communicate()
+
+ if proc.returncode != 0:
+ self.out("Failed to save credentials to zookeeper!")
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to save credentials to zookeeper!")
+ self.out("User created.")
+
+ @property
+ def sasl_enabled(self):
+ return self.sasl_mechanism is not None
def bootstrap_server(self):
return '%s:%d' % (self.host, self.port)
@@ -328,9 +405,17 @@ class KafkaFixture(Fixture):
def start(self):
# Configure Kafka child process
properties = self.tmp_dir.join("kafka.properties")
- template = self.test_resource("kafka.properties")
+ jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf")
+ properties_template = self.test_resource("kafka.properties")
+ jaas_conf_template = self.test_resource("kafka_server_jaas.conf")
+
args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
+ if self.sasl_enabled:
+ opts = env.get('KAFKA_OPTS', '').strip()
+ opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath)
+ env['KAFKA_OPTS'] = opts
+ self.render_template(jaas_conf_template, jaas_conf, vars(self))
timeout = 5
max_timeout = 120
@@ -345,14 +430,17 @@ class KafkaFixture(Fixture):
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
- self.render_template(template, properties, vars(self))
+ self.render_template(properties_template, properties, vars(self))
+
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
- if self.child.wait_for(self.start_pattern, timeout=timeout):
+ if self._broker_ready(timeout) and self._scram_user_present(timeout):
break
+
self.child.dump_logs()
self.child.stop()
+
timeout *= 2
time.sleep(backoff)
tries += 1
@@ -360,11 +448,20 @@ class KafkaFixture(Fixture):
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
- (self._client,) = self.get_clients(1, '_internal_client')
+ (self._client,) = self.get_clients(1, client_id='_internal_client')
self.out("Done!")
self.running = True
+ def _broker_ready(self, timeout):
+ return self.child.wait_for(self.start_pattern, timeout=timeout)
+
+ def _scram_user_present(self, timeout):
+ # no need to wait for scram user if scram is not used
+ if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'):
+ return True
+ return self.child.wait_for(self.scram_pattern, timeout=timeout)
+
def open(self):
if self.running:
self.out("Instance already running")
@@ -378,18 +475,24 @@ class KafkaFixture(Fixture):
self.tmp_dir.ensure('data', dir=True)
self.out("Running local instance...")
- log.info(" host = %s", self.host)
- log.info(" port = %s", self.port or '(auto)')
- log.info(" transport = %s", self.transport)
- log.info(" broker_id = %s", self.broker_id)
- log.info(" zk_host = %s", self.zookeeper.host)
- log.info(" zk_port = %s", self.zookeeper.port)
- log.info(" zk_chroot = %s", self.zk_chroot)
- log.info(" replicas = %s", self.replicas)
- log.info(" partitions = %s", self.partitions)
- log.info(" tmp_dir = %s", self.tmp_dir.strpath)
+ log.info(" host = %s", self.host)
+ log.info(" port = %s", self.port or '(auto)')
+ log.info(" transport = %s", self.transport)
+ log.info(" sasl_mechanism = %s", self.sasl_mechanism)
+ log.info(" broker_id = %s", self.broker_id)
+ log.info(" zk_host = %s", self.zookeeper.host)
+ log.info(" zk_port = %s", self.zookeeper.port)
+ log.info(" zk_chroot = %s", self.zk_chroot)
+ log.info(" replicas = %s", self.replicas)
+ log.info(" partitions = %s", self.partitions)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
self._create_zk_chroot()
+ self.sasl_config = self._sasl_config()
+ self.jaas_config = self._jaas_config()
+ # add user to zookeeper for the first server
+ if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0:
+ self._add_scram_user()
self.start()
atexit.register(self.close)
@@ -437,7 +540,8 @@ class KafkaFixture(Fixture):
future = self._client.send(node_id, request)
future.error_on_callbacks = True
future.add_errback(_failure)
- return self._client.poll(future=future, timeout_ms=timeout)
+ self._client.poll(future=future, timeout_ms=timeout)
+ return future.value
except Exception as exc:
time.sleep(1)
retries -= 1
@@ -446,80 +550,122 @@ class KafkaFixture(Fixture):
else:
pass # retry
- def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000):
if num_partitions is None:
num_partitions = self.partitions
if replication_factor is None:
replication_factor = self.replicas
# Try different methods to create a topic, from the fastest to the slowest
- if self.auto_create_topic and \
- num_partitions == self.partitions and \
- replication_factor == self.replicas:
- self._send_request(MetadataRequest[0]([topic_name]))
+ if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas:
+ self._create_topic_via_metadata(topic_name, timeout_ms)
elif env_kafka_version() >= (0, 10, 1, 0):
- request = CreateTopicsRequest[0]([(topic_name, num_partitions,
- replication_factor, [], [])], timeout_ms)
- result = self._send_request(request, timeout=timeout_ms)
- for topic_result in result[0].topic_error_codes:
- error_code = topic_result[1]
- if error_code != 0:
- raise errors.for_code(error_code)
+ try:
+ self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
+ except InvalidReplicationFactorError:
+ # wait and try again
+ # on travis the brokers sometimes take a while to find themselves
+ time.sleep(0.5)
+ self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
else:
- args = self.kafka_run_class_args('kafka.admin.TopicCommand',
- '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
- self.zookeeper.port,
- self.zk_chroot),
- '--create',
- '--topic', topic_name,
- '--partitions', self.partitions \
- if num_partitions is None else num_partitions,
- '--replication-factor', self.replicas \
- if replication_factor is None \
- else replication_factor)
- if env_kafka_version() >= (0, 10):
- args.append('--if-not-exists')
- env = self.kafka_run_class_env()
- proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = proc.communicate()
- if proc.returncode != 0:
- if 'kafka.common.TopicExistsException' not in stdout:
- self.out("Failed to create topic %s" % (topic_name,))
- self.out(stdout)
- self.out(stderr)
- raise RuntimeError("Failed to create topic %s" % (topic_name,))
+ self._create_topic_via_cli(topic_name, num_partitions, replication_factor)
+
+ def _create_topic_via_metadata(self, topic_name, timeout_ms=10000):
+ self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
+
+ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ request = CreateTopicsRequest[0]([(topic_name, num_partitions,
+ replication_factor, [], [])], timeout_ms)
+ response = self._send_request(request, timeout=timeout_ms)
+ for topic_result in response.topic_errors:
+ error_code = topic_result[1]
+ if error_code != 0:
+ raise errors.for_code(error_code)
+
+ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--create',
+ '--topic', topic_name,
+ '--partitions', self.partitions \
+ if num_partitions is None else num_partitions,
+ '--replication-factor', self.replicas \
+ if replication_factor is None \
+ else replication_factor)
+ if env_kafka_version() >= (0, 10):
+ args.append('--if-not-exists')
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ if 'kafka.common.TopicExistsException' not in stdout:
+ self.out("Failed to create topic %s" % (topic_name,))
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to create topic %s" % (topic_name,))
+
+ def get_topic_names(self):
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--list'
+ )
+ env = self.kafka_run_class_env()
+ env.pop('KAFKA_LOG4J_OPTS')
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ self.out("Failed to list topics!")
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to list topics!")
+ return stdout.decode().splitlines(False)
def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
for topic_name in topic_names:
self._create_topic(topic_name, num_partitions, replication_factor)
- def get_clients(self, cnt=1, client_id=None):
- if client_id is None:
- client_id = 'client'
- return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
- bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
-
- def get_admin_clients(self, cnt=1, **params):
- params.setdefault('client_id', 'admin_client')
- params['bootstrap_servers'] = self.bootstrap_server()
+ def _enrich_client_params(self, params, **defaults):
+ params = params.copy()
+ for key, value in defaults.items():
+ params.setdefault(key, value)
+ params.setdefault('bootstrap_servers', self.bootstrap_server())
+ if self.sasl_enabled:
+ params.setdefault('sasl_mechanism', self.sasl_mechanism)
+ params.setdefault('security_protocol', self.transport)
+ if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
+ params.setdefault('sasl_plain_username', self.broker_user)
+ params.setdefault('sasl_plain_password', self.broker_password)
+ return params
+
+ @staticmethod
+ def _create_many_clients(cnt, cls, *args, **params):
client_id = params['client_id']
- for x in range(cnt):
+ for _ in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaAdminClient(**params)
+ yield cls(*args, **params)
+
+ def get_clients(self, cnt=1, **params):
+ params = self._enrich_client_params(params, client_id='client')
+ for client in self._create_many_clients(cnt, KafkaClient, **params):
+ yield client
+
+ def get_admin_clients(self, cnt, **params):
+ params = self._enrich_client_params(params, client_id='admin_client')
+ for client in self._create_many_clients(cnt, KafkaAdminClient, **params):
+ yield client
def get_consumers(self, cnt, topics, **params):
- params.setdefault('client_id', 'consumer')
- params.setdefault('heartbeat_interval_ms', 500)
- params['bootstrap_servers'] = self.bootstrap_server()
- client_id = params['client_id']
- for x in range(cnt):
- params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaConsumer(*topics, **params)
+ params = self._enrich_client_params(
+ params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest'
+ )
+ for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params):
+ yield client
def get_producers(self, cnt, **params):
- params.setdefault('client_id', 'producer')
- params['bootstrap_servers'] = self.bootstrap_server()
- client_id = params['client_id']
- for x in range(cnt):
- params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaProducer(**params)
+ params = self._enrich_client_params(params, client_id='producer')
+ for client in self._create_many_clients(cnt, KafkaProducer, **params):
+ yield client