diff options
-rw-r--r-- | test/test_consumer_group.py | 5 | ||||
-rw-r--r-- | test/test_coordinator.py | 1 | ||||
-rw-r--r-- | test/test_producer.py | 1 |
3 files changed, 5 insertions, 2 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 690d45a..b930748 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -68,8 +68,8 @@ def test_group(kafka_broker, topic): for tp, records in six.itervalues(consumers[i].poll(100)): messages[i][tp].extend(records) consumers[i].close() - del consumers[i] - del stop[i] + consumers[i] = None + stop[i] = None num_consumers = 4 for i in range(num_consumers): @@ -134,6 +134,7 @@ def test_group(kafka_broker, topic): logging.info('Stopping consumer %s', c) stop[c].set() threads[c].join() + threads[c] = None @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") diff --git a/test/test_coordinator.py b/test/test_coordinator.py index f567369..e094b9c 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -400,6 +400,7 @@ def patched_coord(mocker, coordinator): return_value=1) mocker.patch.object(coordinator._client, 'ready', return_value=True) mocker.patch.object(coordinator._client, 'send') + mocker.patch.object(coordinator, '_heartbeat_thread') mocker.spy(coordinator, '_failed_request') mocker.spy(coordinator, '_handle_offset_commit_response') mocker.spy(coordinator, '_handle_offset_fetch_response') diff --git a/test/test_producer.py b/test/test_producer.py index 20dffc2..f7a5b68 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -72,6 +72,7 @@ def test_end_to_end(kafka_broker, compression): @pytest.mark.skipif(platform.python_implementation() != 'CPython', reason='Test relies on CPython-specific gc policies') def test_kafka_producer_gc_cleanup(): + gc.collect() threads = threading.active_count() producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection assert threading.active_count() == threads + 1 |