summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-03-30 17:42:51 -0700
committerDana Powers <dana.powers@gmail.com>2015-03-30 17:42:51 -0700
commit9fd08119170b64c56ea024d12ef6b0e6482d778b (patch)
tree5fb240b11a7f9e7a5ca9d2c348556967b0f94b0c /test
parent6fc6856746c9e27d4c96b47b1941b6ebbabcb33b (diff)
parent1d252bfc20c8b1058dc93a495c3bdb0f4ccdf590 (diff)
downloadkafka-python-9fd08119170b64c56ea024d12ef6b0e6482d778b.tar.gz
Merge pull request #356 from dpkp/always_fetch_offsets
fetch commit offsets in base consumer unless group is None
Diffstat (limited to 'test')
-rw-r--r--test/test_consumer_integration.py75
-rw-r--r--test/test_failover_integration.py2
2 files changed, 75 insertions, 2 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17a8ac9..403ce0f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -53,6 +53,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['group'] = None
kwargs['auto_commit'] = False
else:
kwargs.setdefault('auto_commit', True)
@@ -127,6 +128,23 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_simple_consumer_load_initial_offsets(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Create 1st consumer and change offsets
+ consumer = self.consumer()
+ self.assertEqual(consumer.offsets, {0: 0, 1: 0})
+ consumer.offsets.update({0:51, 1:101})
+ # Update counter after manual offsets update
+ consumer.count_since_commit += 1
+ consumer.commit()
+
+ # Create 2nd consumer and check initial offsets
+ consumer = self.consumer(auto_commit=False)
+ self.assertEqual(consumer.offsets, {0: 51, 1: 101})
+
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
@@ -243,7 +261,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = MultiProcessConsumer(self.client, "group1", self.topic,
+ # set group to None and auto_commit to False to avoid interactions w/
+ # offset commit/fetch apis
+ consumer = MultiProcessConsumer(self.client, None, self.topic,
auto_commit=False, iter_timeout=0)
self.assertEqual(consumer.pending(), 20)
@@ -252,6 +272,24 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_multi_process_consumer_load_initial_offsets(self):
+ self.send_messages(0, range(0, 10))
+ self.send_messages(1, range(10, 20))
+
+ # Create 1st consumer and change offsets
+ consumer = self.consumer()
+ self.assertEqual(consumer.offsets, {0: 0, 1: 0})
+ consumer.offsets.update({0:5, 1:15})
+ # Update counter after manual offsets update
+ consumer.count_since_commit += 1
+ consumer.commit()
+
+ # Create 2nd consumer and check initial offsets
+ consumer = self.consumer(consumer = MultiProcessConsumer,
+ auto_commit=False)
+ self.assertEqual(consumer.offsets, {0: 5, 1: 15})
+
@kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
@@ -327,6 +365,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_multi_process_offset_behavior__resuming_behavior(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.consumer(
+ consumer=MultiProcessConsumer,
+ auto_commit_every_t = None,
+ auto_commit_every_n = 20,
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ idx = 0
+ for message in consumer1:
+ output_msgs1.append(message.message.value)
+ idx += 1
+ if idx >= 195:
+ break
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.consumer(
+ consumer=MultiProcessConsumer,
+ auto_commit_every_t = None,
+ auto_commit_every_n = 20,
+ )
+
+ # 181-200
+ self.assert_message_count([ message for message in consumer2 ], 20)
+
+ consumer1.stop()
+ consumer2.stop()
+
# TODO: Make this a unit test -- should not require integration
@kafka_versions("all")
def test_fetch_buffer_size(self):
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 7d27526..15f0338 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestCase):
client = KafkaClient(hosts)
group = random_string(10)
- consumer = SimpleConsumer(client, group, topic,
+ consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)