summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_consumer_integration.py66
1 files changed, 36 insertions, 30 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index b1d1a59..e01ce41 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -50,9 +50,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(100, 200))
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1",
- self.topic, auto_commit=False,
- iter_timeout=0)
+ consumer = self.consumer()
self.assert_message_count([ message for message in consumer ], 200)
@@ -63,9 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
- consumer = SimpleConsumer(self.client, "group1",
- self.topic, auto_commit=False,
- iter_timeout=0)
+ consumer = self.consumer()
# Rewind 10 messages from the end
consumer.seek(-10, 2)
@@ -79,9 +75,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_simple_consumer_blocking(self):
- consumer = SimpleConsumer(self.client, "group1",
- self.topic,
- auto_commit=False, iter_timeout=0)
+ consumer = self.consumer()
# Ask for 5 messages, nothing in queue, block 5 seconds
with Timer() as t:
@@ -111,8 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = SimpleConsumer(self.client, "group1", self.topic,
- auto_commit=False, iter_timeout=0)
+ consumer = self.consumer()
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
@@ -126,7 +119,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
- consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
+ consumer = self.consumer(consumer = MultiProcessConsumer)
self.assert_message_count([ message for message in consumer ], 200)
@@ -134,7 +127,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_multi_process_consumer_blocking(self):
- consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
+ consumer = self.consumer(consumer = MultiProcessConsumer)
# Ask for 5 messages, No messages in queue, block 5 seconds
with Timer() as t:
@@ -182,8 +175,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
# Consumer should still get all of them
- consumer = SimpleConsumer(self.client, "group1", self.topic,
- auto_commit=False, iter_timeout=0)
+ consumer = self.consumer()
expected_messages = set(small_messages + large_messages)
actual_messages = set([ x.message.value for x in consumer ])
@@ -198,8 +190,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
])
# Create a consumer with the default buffer size
- consumer = SimpleConsumer(self.client, "group1", self.topic,
- auto_commit=False, iter_timeout=0)
+ consumer = self.consumer()
# This consumer failes to get the message
with self.assertRaises(ConsumerFetchSizeTooSmall):
@@ -208,9 +199,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
# Create a consumer with no fetch size limit
- big_consumer = SimpleConsumer(self.client, "group1", self.topic,
- max_buffer_size=None, partitions=[0],
- auto_commit=False, iter_timeout=0)
+ big_consumer = self.consumer(
+ max_buffer_size = None,
+ partitions = [0],
+ )
# Seek to the last message
big_consumer.seek(-1, 2)
@@ -228,25 +220,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
msgs2 = self.send_messages(1, range(100, 200))
# Start a consumer
- consumer1 = SimpleConsumer(self.client, "group1",
- self.topic, auto_commit=True,
- auto_commit_every_t=600,
- auto_commit_every_n=20,
- iter_timeout=0)
+ consumer1 = self.consumer(
+ auto_commit_every_t = 600,
+ auto_commit_every_n = 20,
+ )
# Grab the first 195 messages
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
self.assert_message_count(output_msgs1, 195)
# The offset should be at 180
- consumer2 = SimpleConsumer(self.client, "group1",
- self.topic, auto_commit=True,
- auto_commit_every_t=600,
- auto_commit_every_n=20,
- iter_timeout=0)
+ consumer2 = self.consumer(
+ auto_commit_every_t = 600,
+ auto_commit_every_n = 20,
+ )
# 180-200
self.assert_message_count([ message for message in consumer2 ], 20)
consumer1.stop()
consumer2.stop()
+
+ def consumer(self, **kwargs):
+ if os.environ['KAFKA_VERSION'] == "0.8.0":
+ # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['auto_commit'] = False
+ else:
+ kwargs.setdefault('auto_commit', True)
+
+ consumer_class = kwargs.pop('consumer', SimpleConsumer)
+ group = kwargs.pop('group', self.id())
+ topic = kwargs.pop('topic', self.topic)
+
+ if consumer_class == SimpleConsumer:
+ kwargs.setdefault('iter_timeout', 0)
+
+ return consumer_class(self.client, group, topic, **kwargs)