diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-12 12:14:05 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-12 18:21:32 -0800 |
commit | fb0b49827ff78bebd0a84c86d890394b00795bcf (patch) | |
tree | 361cdbb930685648cf82215b688f6a4c0f4b63c5 | |
parent | 047a65f1d9965f5b6913b18fabb3f44f8a726430 (diff) | |
download | kafka-python-fb0b49827ff78bebd0a84c86d890394b00795bcf.tar.gz |
Add heartbeat timeout test
-rw-r--r-- | test/test_consumer_group.py | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 6ef2020..3d10f8f 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -1,16 +1,17 @@ import collections import logging import threading -import os import time import pytest import six -from kafka import SimpleClient, SimpleProducer +from kafka import SimpleClient from kafka.common import TopicPartition -from kafka.conn import BrokerConnection, ConnectionStates +from kafka.conn import ConnectionStates from kafka.consumer.group import KafkaConsumer +from kafka.future import Future +from kafka.protocol.metadata import MetadataResponse from test.conftest import version from test.testutil import random_string @@ -115,3 +116,23 @@ def test_group(kafka_broker, topic): finally: for c in range(num_consumers): stop[c].set() + + +@pytest.fixture +def conn(mocker): + conn = mocker.patch('kafka.client_async.BrokerConnection') + conn.return_value = conn + conn.state = ConnectionStates.CONNECTED + conn.send.return_value = Future().success( + MetadataResponse( + [(0, 'foo', 12), (1, 'bar', 34)], # brokers + [])) # topics + return conn + + +def test_heartbeat_timeout(conn, mocker): + mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9') + mocker.patch('time.time', return_value = 1234) + consumer = KafkaConsumer('foobar') + mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0) + assert consumer._next_timeout() == 1234 |