summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py9
-rw-r--r--test/fixtures.py298
-rw-r--r--test/service.py20
-rw-r--r--test/test_sasl_integration.py80
-rw-r--r--test/testutil.py5
5 files changed, 323 insertions, 89 deletions
diff --git a/test/__init__.py b/test/__init__.py
index 71f667d..329277d 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -2,14 +2,7 @@ from __future__ import absolute_import
# Set default logging handler to avoid "No handler found" warnings.
import logging
-try: # Python 2.7+
- from logging import NullHandler
-except ImportError:
- class NullHandler(logging.Handler):
- def emit(self, record):
- pass
-
-logging.getLogger(__name__).addHandler(NullHandler())
+logging.basicConfig(level=logging.INFO)
from kafka.future import Future
Future.error_on_callbacks = True # always fail during testing
diff --git a/test/fixtures.py b/test/fixtures.py
index 557fca6..78cdc5c 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -14,6 +14,7 @@ from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
+from kafka.errors import InvalidReplicationFactorError
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
from test.testutil import env_kafka_version, random_string
@@ -140,6 +141,16 @@ class Fixture(object):
dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
+ log.debug("Template string:")
+ for line in template.splitlines():
+ log.debug(' ' + line.strip())
+ log.debug("Rendered template:")
+ with open(target_file.strpath, 'r') as o:
+ for line in o:
+ log.debug(' ' + line.strip())
+ log.debug("binding:")
+ for key, value in binding.items():
+ log.debug(" {key}={value}".format(key=key, value=value))
def dump_logs(self):
self.child.dump_logs()
@@ -233,11 +244,14 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
+ broker_user = 'alice'
+ broker_password = 'alice-secret'
+
@classmethod
def instance(cls, broker_id, zookeeper, zk_chroot=None,
host=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2,
- sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None):
+ sasl_mechanism=None, auto_create_topic=True, tmp_dir=None):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
@@ -261,7 +275,7 @@ class KafkaFixture(Fixture):
def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT',
- sasl_mechanism='PLAIN', auto_create_topic=True,
+ sasl_mechanism=None, auto_create_topic=True,
tmp_dir=None):
super(KafkaFixture, self).__init__()
@@ -271,13 +285,18 @@ class KafkaFixture(Fixture):
self.broker_id = broker_id
self.auto_create_topic = auto_create_topic
self.transport = transport.upper()
- self.sasl_mechanism = sasl_mechanism.upper()
+ if sasl_mechanism is not None:
+ self.sasl_mechanism = sasl_mechanism.upper()
+ else:
+ self.sasl_mechanism = None
self.ssl_dir = self.test_resource('ssl')
# TODO: checking for port connection would be better than scanning logs
# until then, we need the pattern to work across all supported broker versions
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)
+ # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism
+ self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user)
self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
@@ -292,6 +311,64 @@ class KafkaFixture(Fixture):
self.running = False
self._client = None
+ self.sasl_config = ''
+ self.jaas_config = ''
+
+ def _sasl_config(self):
+ if not self.sasl_enabled:
+ return ''
+
+ sasl_config = "sasl.enabled.mechanisms={mechanism}\n"
+ sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n"
+ return sasl_config.format(mechanism=self.sasl_mechanism)
+
+ def _jaas_config(self):
+ if not self.sasl_enabled:
+ return ''
+
+ elif self.sasl_mechanism == 'PLAIN':
+ jaas_config = (
+ "org.apache.kafka.common.security.plain.PlainLoginModule required\n"
+ ' username="{user}" password="{password}" user_{user}="{password}";\n'
+ )
+ elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"):
+ jaas_config = (
+ "org.apache.kafka.common.security.scram.ScramLoginModule required\n"
+ ' username="{user}" password="{password}";\n'
+ )
+ else:
+ raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism))
+ return jaas_config.format(user=self.broker_user, password=self.broker_password)
+
+ def _add_scram_user(self):
+ self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user))
+ args = self.kafka_run_class_args(
+ "kafka.admin.ConfigCommand",
+ "--zookeeper",
+ "%s:%d/%s" % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ "--alter",
+ "--entity-type", "users",
+ "--entity-name", self.broker_user,
+ "--add-config",
+ "{}=[password={}]".format(self.sasl_mechanism, self.broker_password),
+ )
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ stdout, stderr = proc.communicate()
+
+ if proc.returncode != 0:
+ self.out("Failed to save credentials to zookeeper!")
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to save credentials to zookeeper!")
+ self.out("User created.")
+
+ @property
+ def sasl_enabled(self):
+ return self.sasl_mechanism is not None
def bootstrap_server(self):
return '%s:%d' % (self.host, self.port)
@@ -328,9 +405,17 @@ class KafkaFixture(Fixture):
def start(self):
# Configure Kafka child process
properties = self.tmp_dir.join("kafka.properties")
- template = self.test_resource("kafka.properties")
+ jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf")
+ properties_template = self.test_resource("kafka.properties")
+ jaas_conf_template = self.test_resource("kafka_server_jaas.conf")
+
args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
+ if self.sasl_enabled:
+ opts = env.get('KAFKA_OPTS', '').strip()
+ opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath)
+ env['KAFKA_OPTS'] = opts
+ self.render_template(jaas_conf_template, jaas_conf, vars(self))
timeout = 5
max_timeout = 120
@@ -345,14 +430,17 @@ class KafkaFixture(Fixture):
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
- self.render_template(template, properties, vars(self))
+ self.render_template(properties_template, properties, vars(self))
+
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
- if self.child.wait_for(self.start_pattern, timeout=timeout):
+ if self._broker_ready(timeout) and self._scram_user_present(timeout):
break
+
self.child.dump_logs()
self.child.stop()
+
timeout *= 2
time.sleep(backoff)
tries += 1
@@ -360,11 +448,20 @@ class KafkaFixture(Fixture):
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
- (self._client,) = self.get_clients(1, '_internal_client')
+ (self._client,) = self.get_clients(1, client_id='_internal_client')
self.out("Done!")
self.running = True
+ def _broker_ready(self, timeout):
+ return self.child.wait_for(self.start_pattern, timeout=timeout)
+
+ def _scram_user_present(self, timeout):
+ # no need to wait for scram user if scram is not used
+ if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'):
+ return True
+ return self.child.wait_for(self.scram_pattern, timeout=timeout)
+
def open(self):
if self.running:
self.out("Instance already running")
@@ -378,18 +475,24 @@ class KafkaFixture(Fixture):
self.tmp_dir.ensure('data', dir=True)
self.out("Running local instance...")
- log.info(" host = %s", self.host)
- log.info(" port = %s", self.port or '(auto)')
- log.info(" transport = %s", self.transport)
- log.info(" broker_id = %s", self.broker_id)
- log.info(" zk_host = %s", self.zookeeper.host)
- log.info(" zk_port = %s", self.zookeeper.port)
- log.info(" zk_chroot = %s", self.zk_chroot)
- log.info(" replicas = %s", self.replicas)
- log.info(" partitions = %s", self.partitions)
- log.info(" tmp_dir = %s", self.tmp_dir.strpath)
+ log.info(" host = %s", self.host)
+ log.info(" port = %s", self.port or '(auto)')
+ log.info(" transport = %s", self.transport)
+ log.info(" sasl_mechanism = %s", self.sasl_mechanism)
+ log.info(" broker_id = %s", self.broker_id)
+ log.info(" zk_host = %s", self.zookeeper.host)
+ log.info(" zk_port = %s", self.zookeeper.port)
+ log.info(" zk_chroot = %s", self.zk_chroot)
+ log.info(" replicas = %s", self.replicas)
+ log.info(" partitions = %s", self.partitions)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
self._create_zk_chroot()
+ self.sasl_config = self._sasl_config()
+ self.jaas_config = self._jaas_config()
+ # add user to zookeeper for the first server
+ if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0:
+ self._add_scram_user()
self.start()
atexit.register(self.close)
@@ -437,7 +540,8 @@ class KafkaFixture(Fixture):
future = self._client.send(node_id, request)
future.error_on_callbacks = True
future.add_errback(_failure)
- return self._client.poll(future=future, timeout_ms=timeout)
+ self._client.poll(future=future, timeout_ms=timeout)
+ return future.value
except Exception as exc:
time.sleep(1)
retries -= 1
@@ -446,80 +550,122 @@ class KafkaFixture(Fixture):
else:
pass # retry
- def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000):
if num_partitions is None:
num_partitions = self.partitions
if replication_factor is None:
replication_factor = self.replicas
# Try different methods to create a topic, from the fastest to the slowest
- if self.auto_create_topic and \
- num_partitions == self.partitions and \
- replication_factor == self.replicas:
- self._send_request(MetadataRequest[0]([topic_name]))
+ if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas:
+ self._create_topic_via_metadata(topic_name, timeout_ms)
elif env_kafka_version() >= (0, 10, 1, 0):
- request = CreateTopicsRequest[0]([(topic_name, num_partitions,
- replication_factor, [], [])], timeout_ms)
- result = self._send_request(request, timeout=timeout_ms)
- for topic_result in result[0].topic_error_codes:
- error_code = topic_result[1]
- if error_code != 0:
- raise errors.for_code(error_code)
+ try:
+ self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
+ except InvalidReplicationFactorError:
+ # wait and try again
+ # on travis the brokers sometimes take a while to find themselves
+ time.sleep(0.5)
+ self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
else:
- args = self.kafka_run_class_args('kafka.admin.TopicCommand',
- '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
- self.zookeeper.port,
- self.zk_chroot),
- '--create',
- '--topic', topic_name,
- '--partitions', self.partitions \
- if num_partitions is None else num_partitions,
- '--replication-factor', self.replicas \
- if replication_factor is None \
- else replication_factor)
- if env_kafka_version() >= (0, 10):
- args.append('--if-not-exists')
- env = self.kafka_run_class_env()
- proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = proc.communicate()
- if proc.returncode != 0:
- if 'kafka.common.TopicExistsException' not in stdout:
- self.out("Failed to create topic %s" % (topic_name,))
- self.out(stdout)
- self.out(stderr)
- raise RuntimeError("Failed to create topic %s" % (topic_name,))
+ self._create_topic_via_cli(topic_name, num_partitions, replication_factor)
+
+ def _create_topic_via_metadata(self, topic_name, timeout_ms=10000):
+ self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
+
+ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ request = CreateTopicsRequest[0]([(topic_name, num_partitions,
+ replication_factor, [], [])], timeout_ms)
+ response = self._send_request(request, timeout=timeout_ms)
+ for topic_result in response.topic_errors:
+ error_code = topic_result[1]
+ if error_code != 0:
+ raise errors.for_code(error_code)
+
+ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--create',
+ '--topic', topic_name,
+ '--partitions', self.partitions \
+ if num_partitions is None else num_partitions,
+ '--replication-factor', self.replicas \
+ if replication_factor is None \
+ else replication_factor)
+ if env_kafka_version() >= (0, 10):
+ args.append('--if-not-exists')
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ if 'kafka.common.TopicExistsException' not in stdout:
+ self.out("Failed to create topic %s" % (topic_name,))
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to create topic %s" % (topic_name,))
+
+ def get_topic_names(self):
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--list'
+ )
+ env = self.kafka_run_class_env()
+ env.pop('KAFKA_LOG4J_OPTS')
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ self.out("Failed to list topics!")
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to list topics!")
+ return stdout.decode().splitlines(False)
def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
for topic_name in topic_names:
self._create_topic(topic_name, num_partitions, replication_factor)
- def get_clients(self, cnt=1, client_id=None):
- if client_id is None:
- client_id = 'client'
- return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
- bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
-
- def get_admin_clients(self, cnt=1, **params):
- params.setdefault('client_id', 'admin_client')
- params['bootstrap_servers'] = self.bootstrap_server()
+ def _enrich_client_params(self, params, **defaults):
+ params = params.copy()
+ for key, value in defaults.items():
+ params.setdefault(key, value)
+ params.setdefault('bootstrap_servers', self.bootstrap_server())
+ if self.sasl_enabled:
+ params.setdefault('sasl_mechanism', self.sasl_mechanism)
+ params.setdefault('security_protocol', self.transport)
+ if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
+ params.setdefault('sasl_plain_username', self.broker_user)
+ params.setdefault('sasl_plain_password', self.broker_password)
+ return params
+
+ @staticmethod
+ def _create_many_clients(cnt, cls, *args, **params):
client_id = params['client_id']
- for x in range(cnt):
+ for _ in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaAdminClient(**params)
+ yield cls(*args, **params)
+
+ def get_clients(self, cnt=1, **params):
+ params = self._enrich_client_params(params, client_id='client')
+ for client in self._create_many_clients(cnt, KafkaClient, **params):
+ yield client
+
+ def get_admin_clients(self, cnt, **params):
+ params = self._enrich_client_params(params, client_id='admin_client')
+ for client in self._create_many_clients(cnt, KafkaAdminClient, **params):
+ yield client
def get_consumers(self, cnt, topics, **params):
- params.setdefault('client_id', 'consumer')
- params.setdefault('heartbeat_interval_ms', 500)
- params['bootstrap_servers'] = self.bootstrap_server()
- client_id = params['client_id']
- for x in range(cnt):
- params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaConsumer(*topics, **params)
+ params = self._enrich_client_params(
+ params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest'
+ )
+ for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params):
+ yield client
def get_producers(self, cnt, **params):
- params.setdefault('client_id', 'producer')
- params['bootstrap_servers'] = self.bootstrap_server()
- client_id = params['client_id']
- for x in range(cnt):
- params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaProducer(**params)
+ params = self._enrich_client_params(params, client_id='producer')
+ for client in self._create_many_clients(cnt, KafkaProducer, **params):
+ yield client
diff --git a/test/service.py b/test/service.py
index 47fb846..045d780 100644
--- a/test/service.py
+++ b/test/service.py
@@ -45,6 +45,11 @@ class SpawnedService(threading.Thread):
self.child = None
self.alive = False
self.daemon = True
+ log.info("Created service for command:")
+ log.info(" "+' '.join(self.args))
+ log.debug("With environment:")
+ for key, value in self.env.items():
+ log.debug(" {key}={value}".format(key=key, value=value))
def _spawn(self):
if self.alive: return
@@ -57,7 +62,7 @@ class SpawnedService(threading.Thread):
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- self.alive = True
+ self.alive = self.child.poll() is None
def _despawn(self):
if self.child.poll() is None:
@@ -83,12 +88,14 @@ class SpawnedService(threading.Thread):
raise
if self.child.stdout in rds:
- line = self.child.stdout.readline()
- self.captured_stdout.append(line.decode('utf-8').rstrip())
+ line = self.child.stdout.readline().decode('utf-8').rstrip()
+ if line:
+ self.captured_stdout.append(line)
if self.child.stderr in rds:
- line = self.child.stderr.readline()
- self.captured_stderr.append(line.decode('utf-8').rstrip())
+ line = self.child.stderr.readline().decode('utf-8').rstrip()
+ if line:
+ self.captured_stderr.append(line)
if self.child.poll() is not None:
self.dump_logs()
@@ -105,6 +112,9 @@ class SpawnedService(threading.Thread):
def wait_for(self, pattern, timeout=30):
start = time.time()
while True:
+ if not self.is_alive():
+ raise RuntimeError("Child thread died already.")
+
elapsed = time.time() - start
if elapsed >= timeout:
log.error("Waiting for %r timed out after %d seconds", pattern, timeout)
diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py
new file mode 100644
index 0000000..e3a4813
--- /dev/null
+++ b/test/test_sasl_integration.py
@@ -0,0 +1,80 @@
+import logging
+import uuid
+
+import pytest
+
+from kafka.admin import NewTopic
+from kafka.protocol.metadata import MetadataRequest_v1
+from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
+
+
+@pytest.fixture(
+ params=[
+ pytest.param(
+ "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10")
+ ),
+ pytest.param(
+ "SCRAM-SHA-256",
+ marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
+ ),
+ pytest.param(
+ "SCRAM-SHA-512",
+ marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
+ ),
+ ]
+)
+def sasl_kafka(request, kafka_broker_factory):
+ sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0]
+ yield sasl_kafka
+ sasl_kafka.child.dump_logs()
+
+
+def test_admin(request, sasl_kafka):
+ topic_name = special_to_underscore(request.node.name + random_string(4))
+ admin, = sasl_kafka.get_admin_clients(1)
+ admin.create_topics([NewTopic(topic_name, 1, 1)])
+ assert topic_name in sasl_kafka.get_topic_names()
+
+
+def test_produce_and_consume(request, sasl_kafka):
+ topic_name = special_to_underscore(request.node.name + random_string(4))
+ sasl_kafka.create_topics([topic_name], num_partitions=2)
+ producer, = sasl_kafka.get_producers(1)
+
+ messages_and_futures = [] # [(message, produce_future),]
+ for i in range(100):
+ encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8")
+ future = producer.send(topic_name, value=encoded_msg, partition=i % 2)
+ messages_and_futures.append((encoded_msg, future))
+ producer.flush()
+
+ for (msg, f) in messages_and_futures:
+ assert f.succeeded()
+
+ consumer, = sasl_kafka.get_consumers(1, [topic_name])
+ messages = {0: [], 1: []}
+ for i, message in enumerate(consumer, 1):
+ logging.debug("Consumed message %s", repr(message))
+ messages[message.partition].append(message)
+ if i >= 100:
+ break
+
+ assert_message_count(messages[0], 50)
+ assert_message_count(messages[1], 50)
+
+
+def test_client(request, sasl_kafka):
+ topic_name = special_to_underscore(request.node.name + random_string(4))
+ sasl_kafka.create_topics([topic_name], num_partitions=1)
+
+ client, = sasl_kafka.get_clients(1)
+ request = MetadataRequest_v1(None)
+ client.send(0, request)
+ for _ in range(10):
+ result = client.poll(timeout_ms=10000)
+ if len(result) > 0:
+ break
+ else:
+ raise RuntimeError("Couldn't fetch topic response from Broker.")
+ result = result[0]
+ assert topic_name in [t[1] for t in result.topics]
diff --git a/test/testutil.py b/test/testutil.py
index 77a6673..ec4d70b 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -2,10 +2,15 @@ from __future__ import absolute_import
import os
import random
+import re
import string
import time
+def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')):
+ return _matcher.sub('_', string)
+
+
def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in range(length))