summaryrefslogtreecommitdiff
path: root/test/testutil.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-17 16:11:57 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-17 16:11:57 -0700
commit8983e73437e485d1da30cc12dbf2e78bfada356c (patch)
tree9c991894ca41e02f6d18278977f07ca404a5c963 /test/testutil.py
parent7eaca8eea7adf6e1b8a487a78e9cde950d7221f7 (diff)
downloadkafka-python-8983e73437e485d1da30cc12dbf2e78bfada356c.tar.gz
Split up and speed up producer based integration tests
Diffstat (limited to 'test/testutil.py')
-rw-r--r--test/testutil.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/test/testutil.py b/test/testutil.py
index 7d57ff6..4866b9d 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,6 +1,12 @@
+import uuid
+import time
+import unittest
import os
import random
import string
+import logging
+from kafka.common import OffsetRequest
+from kafka import KafkaClient
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
@@ -8,3 +14,44 @@ def random_string(l):
def skip_integration():
return os.environ.get('SKIP_INTEGRATION')
+
+def ensure_topic_creation(client, topic_name, timeout = 30):
+ start_time = time.time()
+
+ client.load_metadata_for_topics(topic_name)
+ while not client.has_metadata_for_topic(topic_name):
+ if time.time() > start_time + timeout:
+ raise Exception("Unable to create topic %s" % topic_name)
+ client.load_metadata_for_topics(topic_name)
+ time.sleep(1)
+
+class KafkaIntegrationTestCase(unittest.TestCase):
+ topic = None
+
+ def setUp(self):
+ super(KafkaIntegrationTestCase, self).setUp()
+ if not self.topic:
+ self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
+
+ self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
+ ensure_topic_creation(self.client, self.topic)
+ self._messages = {}
+
+ def tearDown(self):
+ super(KafkaIntegrationTestCase, self).tearDown()
+ self.client.close()
+
+ def current_offset(self, topic, partition):
+ offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ])
+ return offsets.offsets[0]
+
+ def msgs(self, iterable):
+ return [ self.msg(x) for x in iterable ]
+
+ def msg(self, s):
+ if s not in self._messages:
+ self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4()))
+
+ return self._messages[s]
+
+logging.basicConfig(level=logging.DEBUG)