diff options
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index a10dae2..d141c36 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient(cls.server2.host, cls.server2.port) @classmethod def tearDownClass(cls): # noqa @@ -583,7 +583,9 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer", auto_commit=False, + iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -609,7 +611,9 @@ class TestConsumer(unittest.TestCase): consumer.stop() def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer_blocking", + auto_commit=False, iter_timeout=0) # Blocking API start = datetime.now() @@ -657,7 +661,8 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", + auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -764,7 +769,8 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) @@ -869,7 +875,7 @@ class TestFailover(unittest.TestCase): def _count_messages(self, group, topic): client = KafkaClient(self.brokers[0].host, self.brokers[0].port) - consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) |