summaryrefslogtreecommitdiff
path: root/test/fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/fixtures.py')
-rw-r--r--test/fixtures.py10
1 files changed, 9 insertions, 1 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index c7748f1..68572b5 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -13,7 +13,7 @@ import py
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
-from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
+from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient
from kafka.client_async import KafkaClient
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
@@ -500,6 +500,14 @@ class KafkaFixture(Fixture):
return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
+ def get_admin_clients(self, cnt=1, **params):
+ params.setdefault('client_id', 'admin_client')
+ params['bootstrap_servers'] = self.bootstrap_server()
+ client_id = params['client_id']
+ for x in range(cnt):
+ params['client_id'] = '%s_%s' % (client_id, random_string(4))
+ yield KafkaAdminClient(**params)
+
def get_consumers(self, cnt, topics, **params):
params.setdefault('client_id', 'consumer')
params.setdefault('heartbeat_interval_ms', 500)