summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-03-24 12:49:44 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-03-24 12:49:44 +0300
commit9641e9fa296a035e73838f07b77310cb5c9eb655 (patch)
tree00d325042e6f87c6e56fdcfb1f75d2a7dbc9b09d /test/test_consumer_integration.py
parente43f405a0cf2912841bf4734d67384a8074e8616 (diff)
downloadkafka-python-9641e9fa296a035e73838f07b77310cb5c9eb655.tar.gz
Moving to **kwargs for MPConsumer options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py8
1 files changed, 3 insertions, 5 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 7dd4a79..d3df56a 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -61,10 +61,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)
- if consumer_class == SimpleConsumer:
+ if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
kwargs.setdefault('iter_timeout', 0)
- elif consumer_class == MultiProcessConsumer:
- kwargs.setdefault('simple_consumer_options', {'iter_timeout': 0})
return consumer_class(self.client, group, topic, **kwargs)
@@ -245,8 +243,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False,
- simple_consumer_options={'iter_timeout': 0})
+ consumer = MultiProcessConsumer(self.client, "group1", self.topic,
+ auto_commit=False, iter_timeout=0)
self.assertEqual(consumer.pending(), 20)
self.assertEqual(consumer.pending(partitions=[0]), 10)