diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-17 16:11:57 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-17 16:11:57 -0700 |
commit | 8983e73437e485d1da30cc12dbf2e78bfada356c (patch) | |
tree | 9c991894ca41e02f6d18278977f07ca404a5c963 /test/testutil.py | |
parent | 7eaca8eea7adf6e1b8a487a78e9cde950d7221f7 (diff) | |
download | kafka-python-8983e73437e485d1da30cc12dbf2e78bfada356c.tar.gz |
Split up and speed up producer based integration tests
Diffstat (limited to 'test/testutil.py')
-rw-r--r-- | test/testutil.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/test/testutil.py b/test/testutil.py index 7d57ff6..4866b9d 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,6 +1,12 @@ +import uuid +import time +import unittest import os import random import string +import logging +from kafka.common import OffsetRequest +from kafka import KafkaClient def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) @@ -8,3 +14,44 @@ def random_string(l): def skip_integration(): return os.environ.get('SKIP_INTEGRATION') + +def ensure_topic_creation(client, topic_name, timeout = 30): + start_time = time.time() + + client.load_metadata_for_topics(topic_name) + while not client.has_metadata_for_topic(topic_name): + if time.time() > start_time + timeout: + raise Exception("Unable to create topic %s" % topic_name) + client.load_metadata_for_topics(topic_name) + time.sleep(1) + +class KafkaIntegrationTestCase(unittest.TestCase): + topic = None + + def setUp(self): + super(KafkaIntegrationTestCase, self).setUp() + if not self.topic: + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) + + self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) + ensure_topic_creation(self.client, self.topic) + self._messages = {} + + def tearDown(self): + super(KafkaIntegrationTestCase, self).tearDown() + self.client.close() + + def current_offset(self, topic, partition): + offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ]) + return offsets.offsets[0] + + def msgs(self, iterable): + return [ self.msg(x) for x in iterable ] + + def msg(self, s): + if s not in self._messages: + self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4())) + + return self._messages[s] + +logging.basicConfig(level=logging.DEBUG) |