summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py32
1 files changed, 20 insertions, 12 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index bf1acc8..d8ead59 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -242,6 +242,7 @@ class TestKafkaClient(unittest.TestCase):
# Offset Tests #
####################
+ @unittest.skip('commmit offset not supported in this version')
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req])
@@ -401,8 +402,9 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_acks_cluster_commit(self):
- producer = SimpleProducer(self.client, "test_acks_cluster_commit",
- req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+ producer = SimpleProducer(
+ self.client, "test_acks_cluster_commit",
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
resp = producer.send_messages("one")
self.assertEquals(len(resp), 1)
@@ -548,11 +550,11 @@ class TestKafkaClient(unittest.TestCase):
class TestConsumer(unittest.TestCase):
@classmethod
- def setUpClass(cls): # noqa
+ def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server2.host, cls.server2.port)
+ cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
@classmethod
def tearDownClass(cls): # noqa
@@ -581,7 +583,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer")
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -604,6 +606,11 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13)
+ consumer.stop()
+
+ def test_simple_consumer_blocking(self):
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False)
+
# Blocking API
start = datetime.now()
messages = consumer.get_messages(block=True, timeout=5)
@@ -612,13 +619,13 @@ class TestConsumer(unittest.TestCase):
self.assertEqual(len(messages), 0)
# Send 10 messages
- produce = ProduceRequest("test_simple_consumer", 0, messages=[
+ produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0)
- self.assertEquals(resp.offset, 100)
+ self.assertEquals(resp.offset, 0)
# Fetch 5 messages
messages = consumer.get_messages(count=5, block=True, timeout=5)
@@ -650,7 +657,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_simple_pending")
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -676,7 +683,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer")
+ consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -732,7 +739,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = MultiProcessConsumer(self.client, "group1", "test_mppending")
+ consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -749,7 +756,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Produce 10 messages that are too large (bigger than default fetch size)
- messages2=[create_message(random_string(5000)) for i in range(10)]
+ messages2 = [create_message(random_string(5000)) for i in range(10)]
produce2 = ProduceRequest("test_large_messages", 0, messages2)
for resp in self.client.send_produce_request([produce2]):
@@ -757,12 +764,13 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 10)
# Consumer should still get all of them
- consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False)
all_messages = messages1 + messages2
for i, message in enumerate(consumer):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+
def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l))
return s