diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_consumer_group.py | 8 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 1 | ||||
-rw-r--r-- | test/test_fetcher.py | 16 |
3 files changed, 3 insertions, 22 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 1acde5e..9d9be60 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -141,11 +141,3 @@ def test_paused(kafka_broker, topic): consumer.unsubscribe() assert set() == consumer.paused() - - -def test_heartbeat_timeout(conn, mocker): - mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9)) - mocker.patch('time.time', return_value = 1234) - consumer = KafkaConsumer('foobar') - mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0) - assert consumer._next_timeout() == 1234 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9c27eee..998045f 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -500,6 +500,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', + enable_auto_commit=False, consumer_timeout_ms=TIMEOUT_MS) # Manual assignment avoids overhead of consumer group mgmt diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 6afd547..fea3f7d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -33,7 +33,7 @@ def fetcher(client, subscription_state): return Fetcher(client, subscription_state, Metrics()) -def test_init_fetches(fetcher, mocker): +def test_send_fetches(fetcher, mocker): fetch_requests = [ FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], @@ -53,19 +53,7 @@ def test_init_fetches(fetcher, mocker): mocker.patch.object(fetcher, '_create_fetch_requests', return_value = dict(enumerate(fetch_requests))) - fetcher._records.append('foobar') - ret = fetcher.init_fetches() - assert fetcher._create_fetch_requests.call_count == 0 - assert ret == [] - fetcher._records.clear() - - fetcher._iterator = 'foo' - ret = fetcher.init_fetches() - assert fetcher._create_fetch_requests.call_count == 0 - assert ret == [] - fetcher._iterator = None - - ret = fetcher.init_fetches() + ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): fetcher._client.send.assert_any_call(node, request) assert len(ret) == len(fetch_requests) |