summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-04 09:36:27 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-04 09:36:27 -0700
commit51fc3e428b7782d3533512c39264552a2ec87f0f (patch)
treee1b1540f98d4f8cdf540c4ecc7d805fa1369bada
parentb96f4ccf070109a022deb98b569e61d23e4e75b9 (diff)
parent3d1c3521db701047215831d4f84a6c653f087250 (diff)
downloadkafka-python-51fc3e428b7782d3533512c39264552a2ec87f0f.tar.gz
Merge pull request #620 from dpkp/issue_619
Improve auto commit task handling with no consumer group
-rw-r--r--kafka/coordinator/consumer.py42
-rw-r--r--test/test_coordinator.py54
2 files changed, 57 insertions, 39 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index a5e3067..b2ef1ea 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -91,8 +91,10 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('Broker version (%s) does not support offset'
' commits; disabling auto-commit.',
self.config['api_version'])
+ self.config['enable_auto_commit'] = False
elif self.config['group_id'] is None:
log.warning('group_id is None: disabling auto-commit.')
+ self.config['enable_auto_commit'] = False
else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
@@ -192,7 +194,7 @@ class ConsumerCoordinator(BaseCoordinator):
assignor.on_assignment(assignment)
# restart the autocommit task if needed
- if self.config['enable_auto_commit']:
+ if self._auto_commit_task:
self._auto_commit_task.enable()
assigned = set(self._subscription.assigned_partitions())
@@ -364,27 +366,27 @@ class ConsumerCoordinator(BaseCoordinator):
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _maybe_auto_commit_offsets_sync(self):
- if self.config['api_version'] < (0, 8, 1):
+ if self._auto_commit_task is None:
return
- if self.config['enable_auto_commit']:
- # disable periodic commits prior to committing synchronously. note that they will
- # be re-enabled after a rebalance completes
- self._auto_commit_task.disable()
- try:
- self.commit_offsets_sync(self._subscription.all_consumed_offsets())
-
- # The three main group membership errors are known and should not
- # require a stacktrace -- just a warning
- except (Errors.UnknownMemberIdError,
- Errors.IllegalGenerationError,
- Errors.RebalanceInProgressError):
- log.warning("Offset commit failed: group membership out of date"
- " This is likely to cause duplicate message"
- " delivery.")
- except Exception:
- log.exception("Offset commit failed: This is likely to cause"
- " duplicate message delivery")
+ # disable periodic commits prior to committing synchronously. note that they will
+ # be re-enabled after a rebalance completes
+ self._auto_commit_task.disable()
+
+ try:
+ self.commit_offsets_sync(self._subscription.all_consumed_offsets())
+
+ # The three main group membership errors are known and should not
+ # require a stacktrace -- just a warning
+ except (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError,
+ Errors.RebalanceInProgressError):
+ log.warning("Offset commit failed: group membership out of date"
+ " This is likely to cause duplicate message"
+ " delivery.")
+ except Exception:
+ log.exception("Offset commit failed: This is likely to cause"
+ " duplicate message delivery")
def _send_offset_commit_request(self, offsets):
"""Commit offsets for the specified list of topics and partitions.
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 847cbc1..44db808 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -52,12 +52,16 @@ def test_init(conn):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(conn, api_version):
- coordinator = ConsumerCoordinator(
- KafkaClient(), SubscriptionState(), api_version=api_version)
+ coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ enable_auto_commit=True,
+ group_id='foobar',
+ api_version=api_version)
if api_version < (0, 8, 1):
assert coordinator._auto_commit_task is None
+ assert coordinator.config['enable_auto_commit'] is False
else:
assert coordinator._auto_commit_task is not None
+ assert coordinator.config['enable_auto_commit'] is True
def test_protocol_type(coordinator):
@@ -349,28 +353,40 @@ def test_commit_offsets_sync(mocker, coordinator, offsets):
@pytest.mark.parametrize(
- 'api_version,enable,error,task_disable,commit_offsets,warn,exc', [
- ((0, 8), True, None, False, False, False, False),
- ((0, 9), False, None, False, False, False, False),
- ((0, 9), True, Errors.UnknownMemberIdError(), True, True, True, False),
- ((0, 9), True, Errors.IllegalGenerationError(), True, True, True, False),
- ((0, 9), True, Errors.RebalanceInProgressError(), True, True, True, False),
- ((0, 9), True, Exception(), True, True, False, True),
- ((0, 9), True, None, True, True, False, False),
+ 'api_version,group_id,enable,error,has_auto_commit,commit_offsets,warn,exc', [
+ ((0, 8), 'foobar', True, None, False, False, True, False),
+ ((0, 9), 'foobar', False, None, False, False, False, False),
+ ((0, 9), 'foobar', True, Errors.UnknownMemberIdError(), True, True, True, False),
+ ((0, 9), 'foobar', True, Errors.IllegalGenerationError(), True, True, True, False),
+ ((0, 9), 'foobar', True, Errors.RebalanceInProgressError(), True, True, True, False),
+ ((0, 9), 'foobar', True, Exception(), True, True, False, True),
+ ((0, 9), 'foobar', True, None, True, True, False, False),
+ ((0, 9), None, True, None, False, False, True, False),
])
-def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
- api_version, enable, error, task_disable,
- commit_offsets, warn, exc):
- auto_commit_task = mocker.patch.object(coordinator, '_auto_commit_task')
- commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
- side_effect=error)
+def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
+ error, has_auto_commit, commit_offsets,
+ warn, exc):
mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
+ coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ api_version=api_version,
+ enable_auto_commit=enable,
+ group_id=group_id)
+ commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync',
+ side_effect=error)
+ if has_auto_commit:
+ assert coordinator._auto_commit_task is not None
+ coordinator._auto_commit_task.enable()
+ assert coordinator._auto_commit_task._enabled is True
+ else:
+ assert coordinator._auto_commit_task is None
- coordinator.config['api_version'] = api_version
- coordinator.config['enable_auto_commit'] = enable
assert coordinator._maybe_auto_commit_offsets_sync() is None
- assert auto_commit_task.disable.call_count == (1 if task_disable else 0)
+
+ if has_auto_commit:
+ assert coordinator._auto_commit_task is not None
+ assert coordinator._auto_commit_task._enabled is False
+
assert commit_sync.call_count == (1 if commit_offsets else 0)
assert mock_warn.call_count == (1 if warn else 0)
assert mock_exc.call_count == (1 if exc else 0)