summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-21 14:46:10 -0800
committerGitHub <noreply@github.com>2017-12-21 14:46:10 -0800
commitad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch)
treef1993351b2c6487e8e623cefabf42ddf7477f666 /test
parent995664c7d407009a0a1030c7541848eb5ad51c97 (diff)
downloadkafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'test')
-rw-r--r--test/test_client_async.py11
-rw-r--r--test/test_consumer.py4
-rw-r--r--test/test_consumer_group.py45
-rw-r--r--test/test_consumer_integration.py3
-rw-r--r--test/test_coordinator.py100
5 files changed, 107 insertions, 56 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index ec45543..eece139 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -253,11 +253,9 @@ def test_poll(mocker):
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(api_version=(0, 9))
- tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
# metadata timeout wins
metadata.return_value = 1000
- tasks.return_value = 2
cli.poll()
_poll.assert_called_with(1.0)
@@ -265,14 +263,8 @@ def test_poll(mocker):
cli.poll(250)
_poll.assert_called_with(0.25)
- # tasks timeout wins
- tasks.return_value = 0
- cli.poll(250)
- _poll.assert_called_with(0)
-
# default is request_timeout_ms
metadata.return_value = 1000000
- tasks.return_value = 10000
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
@@ -325,9 +317,6 @@ def client(mocker):
connections_max_idle_ms=float('inf'),
api_version=(0, 9))
- tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
- tasks.return_value = 9999999
-
ttl = mocker.patch.object(cli.cluster, 'ttl')
ttl.return_value = 0
return cli
diff --git a/test/test_consumer.py b/test/test_consumer.py
index e5dd946..013529f 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -14,11 +14,11 @@ from kafka.structs import (
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
- SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+ SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])
def test_session_timeout_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
- KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)
+ KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 8f25e9f..690d45a 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -9,6 +9,7 @@ import six
from kafka import SimpleClient
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
+from kafka.coordinator.base import MemberState, Generation
from kafka.structs import TopicPartition
from test.conftest import version
@@ -92,9 +93,10 @@ def test_group(kafka_broker, topic):
# If all consumers exist and have an assignment
else:
+ logging.info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop
- generations = set([consumer._coordinator.generation
+ generations = set([consumer._coordinator._generation.generation_id
for consumer in list(consumers.values())])
# New generation assignment is not complete until
@@ -105,12 +107,16 @@ def test_group(kafka_broker, topic):
if not rejoining and len(generations) == 1:
for c, consumer in list(consumers.items()):
logging.info("[%s] %s %s: %s", c,
- consumer._coordinator.generation,
- consumer._coordinator.member_id,
+ consumer._coordinator._generation.generation_id,
+ consumer._coordinator._generation.member_id,
consumer.assignment())
break
+ else:
+ logging.info('Rejoining: %s, generations: %s', rejoining, generations)
+ time.sleep(1)
assert time.time() < timeout, "timeout waiting for assignments"
+ logging.info('Group stabilized; verifying assignment')
group_assignment = set()
for c in range(num_consumers):
assert len(consumers[c].assignment()) != 0
@@ -120,9 +126,12 @@ def test_group(kafka_broker, topic):
assert group_assignment == set([
TopicPartition(topic, partition)
for partition in range(num_partitions)])
+ logging.info('Assignment looks good!')
finally:
+ logging.info('Shutting down %s consumers', num_consumers)
for c in range(num_consumers):
+ logging.info('Stopping consumer %s', c)
stop[c].set()
threads[c].join()
@@ -143,3 +152,33 @@ def test_paused(kafka_broker, topic):
consumer.unsubscribe()
assert set() == consumer.paused()
+
+
+@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+def test_heartbeat_thread(kafka_broker, topic):
+ group_id = 'test-group-' + random_string(6)
+ consumer = KafkaConsumer(topic,
+ bootstrap_servers=get_connect_str(kafka_broker),
+ group_id=group_id,
+ heartbeat_interval_ms=500)
+
+ # poll until we have joined group / have assignment
+ while not consumer.assignment():
+ consumer.poll(timeout_ms=100)
+
+ assert consumer._coordinator.state is MemberState.STABLE
+ last_poll = consumer._coordinator.heartbeat.last_poll
+ last_beat = consumer._coordinator.heartbeat.last_send
+
+ timeout = time.time() + 30
+ while True:
+ if time.time() > timeout:
+ raise RuntimeError('timeout waiting for heartbeat')
+ if consumer._coordinator.heartbeat.last_send > last_beat:
+ break
+ time.sleep(0.5)
+
+ assert consumer._coordinator.heartbeat.last_poll == last_poll
+ consumer.poll(timeout_ms=100)
+ assert consumer._coordinator.heartbeat.last_poll > last_poll
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index d1843b3..ded2314 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -739,7 +739,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_times_errors(self):
- consumer = self.kafka_consumer()
+ consumer = self.kafka_consumer(fetch_max_wait_ms=200,
+ request_timeout_ms=500)
tp = TopicPartition(self.topic, 0)
bad_tp = TopicPartition(self.topic, 100)
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 0e96110..7dc0e04 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -10,6 +10,7 @@ from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.base import Generation, MemberState, HeartbeatThread
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
@@ -43,13 +44,13 @@ def test_autocommit_enable_api_version(client, api_version):
coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(),
enable_auto_commit=True,
+ session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms
+ max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError
group_id='foobar',
api_version=api_version)
if api_version < (0, 8, 1):
- assert coordinator._auto_commit_task is None
assert coordinator.config['enable_auto_commit'] is False
else:
- assert coordinator._auto_commit_task is not None
assert coordinator.config['enable_auto_commit'] is True
@@ -269,19 +270,19 @@ def test_close(mocker, coordinator):
mocker.patch.object(coordinator, '_handle_leave_group_response')
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
- coordinator.generation = 1
+ coordinator._generation = Generation(1, 'foobar', b'')
+ coordinator.state = MemberState.STABLE
cli = coordinator._client
- mocker.patch.object(cli, 'unschedule')
mocker.patch.object(cli, 'send', return_value=Future().success('foobar'))
mocker.patch.object(cli, 'poll')
coordinator.close()
assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1
- cli.unschedule.assert_called_with(coordinator.heartbeat_task)
coordinator._handle_leave_group_response.assert_called_with('foobar')
- assert coordinator.generation == -1
- assert coordinator.member_id == ''
+ assert coordinator.generation() is None
+ assert coordinator._generation is Generation.NO_GENERATION
+ assert coordinator.state is MemberState.UNJOINED
assert coordinator.rejoin_needed is True
@@ -296,6 +297,7 @@ def offsets():
def test_commit_offsets_async(mocker, coordinator, offsets):
mocker.patch.object(coordinator._client, 'poll')
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
+ mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
coordinator.commit_offsets_async(offsets)
@@ -362,19 +364,21 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(),
api_version=api_version,
+ session_timeout_ms=30000,
+ max_poll_interval_ms=30000,
enable_auto_commit=enable,
group_id=group_id)
commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
side_effect=error)
if has_auto_commit:
- assert coordinator._auto_commit_task is not None
+ assert coordinator.next_auto_commit_deadline is not None
else:
- assert coordinator._auto_commit_task is None
+ assert coordinator.next_auto_commit_deadline is None
assert coordinator._maybe_auto_commit_offsets_sync() is None
if has_auto_commit:
- assert coordinator._auto_commit_task is not None
+ assert coordinator.next_auto_commit_deadline is not None
assert commit_sync.call_count == (1 if commit_offsets else 0)
assert mock_warn.call_count == (1 if warn else 0)
@@ -387,24 +391,25 @@ def patched_coord(mocker, coordinator):
coordinator._subscription.needs_partition_assignment = False
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
- coordinator.generation = 0
+ mocker.patch.object(coordinator, 'coordinator', return_value=0)
+ coordinator._generation = Generation(0, 'foobar', b'')
+ coordinator.state = MemberState.STABLE
+ coordinator.rejoin_needed = False
mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
mocker.patch.object(coordinator._client, 'least_loaded_node',
return_value=1)
mocker.patch.object(coordinator._client, 'ready', return_value=True)
mocker.patch.object(coordinator._client, 'send')
- mocker.patch.object(coordinator._client, 'schedule')
mocker.spy(coordinator, '_failed_request')
mocker.spy(coordinator, '_handle_offset_commit_response')
mocker.spy(coordinator, '_handle_offset_fetch_response')
- mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success')
- mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure')
return coordinator
-def test_send_offset_commit_request_fail(patched_coord, offsets):
+def test_send_offset_commit_request_fail(mocker, patched_coord, offsets):
patched_coord.coordinator_unknown.return_value = True
patched_coord.coordinator_id = None
+ patched_coord.coordinator.return_value = None
# No offsets
ret = patched_coord._send_offset_commit_request({})
@@ -488,7 +493,14 @@ def test_handle_offset_commit_response(mocker, patched_coord, offsets,
response)
assert isinstance(future.exception, error)
assert patched_coord.coordinator_id is (None if dead else 0)
- assert patched_coord._subscription.needs_partition_assignment is reassign
+ if reassign:
+ assert patched_coord._generation is Generation.NO_GENERATION
+ assert patched_coord.rejoin_needed is True
+ assert patched_coord.state is MemberState.UNJOINED
+ else:
+ assert patched_coord._generation is not Generation.NO_GENERATION
+ assert patched_coord.rejoin_needed is False
+ assert patched_coord.state is MemberState.STABLE
@pytest.fixture
@@ -496,9 +508,10 @@ def partitions():
return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)]
-def test_send_offset_fetch_request_fail(patched_coord, partitions):
+def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions):
patched_coord.coordinator_unknown.return_value = True
patched_coord.coordinator_id = None
+ patched_coord.coordinator.return_value = None
# No partitions
ret = patched_coord._send_offset_fetch_request([])
@@ -551,28 +564,18 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
future, response)
-@pytest.mark.parametrize('response,error,dead,reassign', [
- #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]),
- # Errors.GroupAuthorizationFailedError, False, False),
- #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
- # Errors.RequestTimedOutError, True, False),
- #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
- # Errors.RebalanceInProgressError, False, True),
+@pytest.mark.parametrize('response,error,dead', [
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]),
- Errors.GroupLoadInProgressError, False, False),
+ Errors.GroupLoadInProgressError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]),
- Errors.NotCoordinatorForGroupError, True, False),
- (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]),
- Errors.UnknownMemberIdError, False, True),
- (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]),
- Errors.IllegalGenerationError, False, True),
+ Errors.NotCoordinatorForGroupError, True),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]),
- Errors.TopicAuthorizationFailedError, False, False),
+ Errors.TopicAuthorizationFailedError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]),
- None, False, False),
+ None, False),
])
def test_handle_offset_fetch_response(patched_coord, offsets,
- response, error, dead, reassign):
+ response, error, dead):
future = Future()
patched_coord._handle_offset_fetch_response(future, response)
if error is not None:
@@ -581,15 +584,34 @@ def test_handle_offset_fetch_response(patched_coord, offsets,
assert future.succeeded()
assert future.value == offsets
assert patched_coord.coordinator_id is (None if dead else 0)
- assert patched_coord._subscription.needs_partition_assignment is reassign
-def test_heartbeat(patched_coord):
- patched_coord.coordinator_unknown.return_value = True
+def test_heartbeat(mocker, patched_coord):
+ heartbeat = HeartbeatThread(patched_coord)
+
+ assert not heartbeat.enabled and not heartbeat.closed
+
+ heartbeat.enable()
+ assert heartbeat.enabled
+
+ heartbeat.disable()
+ assert not heartbeat.enabled
+
+ # heartbeat disables when un-joined
+ heartbeat.enable()
+ patched_coord.state = MemberState.UNJOINED
+ heartbeat._run_once()
+ assert not heartbeat.enabled
+
+ heartbeat.enable()
+ patched_coord.state = MemberState.STABLE
+ mocker.spy(patched_coord, '_send_heartbeat_request')
+ mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True)
+ heartbeat._run_once()
+ assert patched_coord._send_heartbeat_request.call_count == 1
- patched_coord.heartbeat_task()
- assert patched_coord._client.schedule.call_count == 1
- assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1
+ heartbeat.close()
+ assert heartbeat.closed
def test_lookup_coordinator_failure(mocker, coordinator):