summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-12 12:14:05 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-12 18:21:32 -0800
commitfb0b49827ff78bebd0a84c86d890394b00795bcf (patch)
tree361cdbb930685648cf82215b688f6a4c0f4b63c5
parent047a65f1d9965f5b6913b18fabb3f44f8a726430 (diff)
downloadkafka-python-fb0b49827ff78bebd0a84c86d890394b00795bcf.tar.gz
Add heartbeat timeout test
-rw-r--r--test/test_consumer_group.py27
1 files changed, 24 insertions, 3 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 6ef2020..3d10f8f 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -1,16 +1,17 @@
import collections
import logging
import threading
-import os
import time
import pytest
import six
-from kafka import SimpleClient, SimpleProducer
+from kafka import SimpleClient
from kafka.common import TopicPartition
-from kafka.conn import BrokerConnection, ConnectionStates
+from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
+from kafka.future import Future
+from kafka.protocol.metadata import MetadataResponse
from test.conftest import version
from test.testutil import random_string
@@ -115,3 +116,23 @@ def test_group(kafka_broker, topic):
finally:
for c in range(num_consumers):
stop[c].set()
+
+
+@pytest.fixture
+def conn(mocker):
+ conn = mocker.patch('kafka.client_async.BrokerConnection')
+ conn.return_value = conn
+ conn.state = ConnectionStates.CONNECTED
+ conn.send.return_value = Future().success(
+ MetadataResponse(
+ [(0, 'foo', 12), (1, 'bar', 34)], # brokers
+ [])) # topics
+ return conn
+
+
+def test_heartbeat_timeout(conn, mocker):
+ mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
+ mocker.patch('time.time', return_value = 1234)
+ consumer = KafkaConsumer('foobar')
+ mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
+ assert consumer._next_timeout() == 1234