summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-29 17:24:56 -0700
committerDana Powers <dana.powers@rd.io>2015-03-30 15:02:44 -0700
commitb6d032cc3f1b53a6d5b395f9b14de62f547c8f1c (patch)
tree2111012432d8f23991068d3c6ea438da8fe10d2e /test
parent32dd817aac4130a019339afac7ef52f2b9b7acd4 (diff)
downloadkafka-python-b6d032cc3f1b53a6d5b395f9b14de62f547c8f1c.tar.gz
Fetch previously committed offsets in base consumer class so long as
a group is configured (but document that group must be None for old servers). This fixes multiprocessor consumer issue that prevented access to commit offsets if auto_commit is disabled. Also refactor fetch_last_known_offsets based on KafkaConsumer While still setting unknown offsets to 0
Diffstat (limited to 'test')
-rw-r--r--test/test_consumer_integration.py5
-rw-r--r--test/test_failover_integration.py2
2 files changed, 5 insertions, 2 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index fd62d9b..403ce0f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -53,6 +53,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['group'] = None
kwargs['auto_commit'] = False
else:
kwargs.setdefault('auto_commit', True)
@@ -260,7 +261,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = MultiProcessConsumer(self.client, "group1", self.topic,
+ # set group to None and auto_commit to False to avoid interactions w/
+ # offset commit/fetch apis
+ consumer = MultiProcessConsumer(self.client, None, self.topic,
auto_commit=False, iter_timeout=0)
self.assertEqual(consumer.pending(), 20)
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 7d27526..15f0338 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestCase):
client = KafkaClient(hosts)
group = random_string(10)
- consumer = SimpleConsumer(client, group, topic,
+ consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)