summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index a10dae2..d141c36 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
+ cls.client = KafkaClient(cls.server2.host, cls.server2.port)
@classmethod
def tearDownClass(cls): # noqa
@@ -583,7 +583,9 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ "test_simple_consumer", auto_commit=False,
+ iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -609,7 +611,9 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
def test_simple_consumer_blocking(self):
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ "test_simple_consumer_blocking",
+ auto_commit=False, iter_timeout=0)
# Blocking API
start = datetime.now()
@@ -657,7 +661,8 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_pending",
+ auto_commit=False, iter_timeout=0)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -764,7 +769,8 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 10)
# Consumer should still get all of them
- consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
+ auto_commit=False, iter_timeout=0)
all_messages = messages1 + messages2
for i, message in enumerate(consumer):
self.assertEquals(all_messages[i], message.message)
@@ -869,7 +875,7 @@ class TestFailover(unittest.TestCase):
def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
- consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)