summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_consumer_group.py8
-rw-r--r--test/test_consumer_integration.py1
-rw-r--r--test/test_fetcher.py16
3 files changed, 3 insertions, 22 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 1acde5e..9d9be60 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -141,11 +141,3 @@ def test_paused(kafka_broker, topic):
consumer.unsubscribe()
assert set() == consumer.paused()
-
-
-def test_heartbeat_timeout(conn, mocker):
- mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9))
- mocker.patch('time.time', return_value = 1234)
- consumer = KafkaConsumer('foobar')
- mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
- assert consumer._next_timeout() == 1234
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 9c27eee..998045f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -500,6 +500,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
+ enable_auto_commit=False,
consumer_timeout_ms=TIMEOUT_MS)
# Manual assignment avoids overhead of consumer group mgmt
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 6afd547..fea3f7d 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -33,7 +33,7 @@ def fetcher(client, subscription_state):
return Fetcher(client, subscription_state, Metrics())
-def test_init_fetches(fetcher, mocker):
+def test_send_fetches(fetcher, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
@@ -53,19 +53,7 @@ def test_init_fetches(fetcher, mocker):
mocker.patch.object(fetcher, '_create_fetch_requests',
return_value = dict(enumerate(fetch_requests)))
- fetcher._records.append('foobar')
- ret = fetcher.init_fetches()
- assert fetcher._create_fetch_requests.call_count == 0
- assert ret == []
- fetcher._records.clear()
-
- fetcher._iterator = 'foo'
- ret = fetcher.init_fetches()
- assert fetcher._create_fetch_requests.call_count == 0
- assert ret == []
- fetcher._iterator = None
-
- ret = fetcher.init_fetches()
+ ret = fetcher.send_fetches()
for node, request in enumerate(fetch_requests):
fetcher._client.send.assert_any_call(node, request)
assert len(ret) == len(fetch_requests)