diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-04 09:36:27 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-04 09:36:27 -0700 |
commit | 51fc3e428b7782d3533512c39264552a2ec87f0f (patch) | |
tree | e1b1540f98d4f8cdf540c4ecc7d805fa1369bada | |
parent | b96f4ccf070109a022deb98b569e61d23e4e75b9 (diff) | |
parent | 3d1c3521db701047215831d4f84a6c653f087250 (diff) | |
download | kafka-python-51fc3e428b7782d3533512c39264552a2ec87f0f.tar.gz |
Merge pull request #620 from dpkp/issue_619
Improve auto commit task handling with no consumer group
-rw-r--r-- | kafka/coordinator/consumer.py | 42 | ||||
-rw-r--r-- | test/test_coordinator.py | 54 |
2 files changed, 57 insertions, 39 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a5e3067..b2ef1ea 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -91,8 +91,10 @@ class ConsumerCoordinator(BaseCoordinator): log.warning('Broker version (%s) does not support offset' ' commits; disabling auto-commit.', self.config['api_version']) + self.config['enable_auto_commit'] = False elif self.config['group_id'] is None: log.warning('group_id is None: disabling auto-commit.') + self.config['enable_auto_commit'] = False else: interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) @@ -192,7 +194,7 @@ class ConsumerCoordinator(BaseCoordinator): assignor.on_assignment(assignment) # restart the autocommit task if needed - if self.config['enable_auto_commit']: + if self._auto_commit_task: self._auto_commit_task.enable() assigned = set(self._subscription.assigned_partitions()) @@ -364,27 +366,27 @@ class ConsumerCoordinator(BaseCoordinator): time.sleep(self.config['retry_backoff_ms'] / 1000.0) def _maybe_auto_commit_offsets_sync(self): - if self.config['api_version'] < (0, 8, 1): + if self._auto_commit_task is None: return - if self.config['enable_auto_commit']: - # disable periodic commits prior to committing synchronously. note that they will - # be re-enabled after a rebalance completes - self._auto_commit_task.disable() - try: - self.commit_offsets_sync(self._subscription.all_consumed_offsets()) - - # The three main group membership errors are known and should not - # require a stacktrace -- just a warning - except (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): - log.warning("Offset commit failed: group membership out of date" - " This is likely to cause duplicate message" - " delivery.") - except Exception: - log.exception("Offset commit failed: This is likely to cause" - " duplicate message delivery") + # disable periodic commits prior to committing synchronously. note that they will + # be re-enabled after a rebalance completes + self._auto_commit_task.disable() + + try: + self.commit_offsets_sync(self._subscription.all_consumed_offsets()) + + # The three main group membership errors are known and should not + # require a stacktrace -- just a warning + except (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError): + log.warning("Offset commit failed: group membership out of date" + " This is likely to cause duplicate message" + " delivery.") + except Exception: + log.exception("Offset commit failed: This is likely to cause" + " duplicate message delivery") def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 847cbc1..44db808 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -52,12 +52,16 @@ def test_init(conn): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_autocommit_enable_api_version(conn, api_version): - coordinator = ConsumerCoordinator( - KafkaClient(), SubscriptionState(), api_version=api_version) + coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + enable_auto_commit=True, + group_id='foobar', + api_version=api_version) if api_version < (0, 8, 1): assert coordinator._auto_commit_task is None + assert coordinator.config['enable_auto_commit'] is False else: assert coordinator._auto_commit_task is not None + assert coordinator.config['enable_auto_commit'] is True def test_protocol_type(coordinator): @@ -349,28 +353,40 @@ def test_commit_offsets_sync(mocker, coordinator, offsets): @pytest.mark.parametrize( - 'api_version,enable,error,task_disable,commit_offsets,warn,exc', [ - ((0, 8), True, None, False, False, False, False), - ((0, 9), False, None, False, False, False, False), - ((0, 9), True, Errors.UnknownMemberIdError(), True, True, True, False), - ((0, 9), True, Errors.IllegalGenerationError(), True, True, True, False), - ((0, 9), True, Errors.RebalanceInProgressError(), True, True, True, False), - ((0, 9), True, Exception(), True, True, False, True), - ((0, 9), True, None, True, True, False, False), + 'api_version,group_id,enable,error,has_auto_commit,commit_offsets,warn,exc', [ + ((0, 8), 'foobar', True, None, False, False, True, False), + ((0, 9), 'foobar', False, None, False, False, False, False), + ((0, 9), 'foobar', True, Errors.UnknownMemberIdError(), True, True, True, False), + ((0, 9), 'foobar', True, Errors.IllegalGenerationError(), True, True, True, False), + ((0, 9), 'foobar', True, Errors.RebalanceInProgressError(), True, True, True, False), + ((0, 9), 'foobar', True, Exception(), True, True, False, True), + ((0, 9), 'foobar', True, None, True, True, False, False), + ((0, 9), None, True, None, False, False, True, False), ]) -def test_maybe_auto_commit_offsets_sync(mocker, coordinator, - api_version, enable, error, task_disable, - commit_offsets, warn, exc): - auto_commit_task = mocker.patch.object(coordinator, '_auto_commit_task') - commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', - side_effect=error) +def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, + error, has_auto_commit, commit_offsets, + warn, exc): mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') + coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + api_version=api_version, + enable_auto_commit=enable, + group_id=group_id) + commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', + side_effect=error) + if has_auto_commit: + assert coordinator._auto_commit_task is not None + coordinator._auto_commit_task.enable() + assert coordinator._auto_commit_task._enabled is True + else: + assert coordinator._auto_commit_task is None - coordinator.config['api_version'] = api_version - coordinator.config['enable_auto_commit'] = enable assert coordinator._maybe_auto_commit_offsets_sync() is None - assert auto_commit_task.disable.call_count == (1 if task_disable else 0) + + if has_auto_commit: + assert coordinator._auto_commit_task is not None + assert coordinator._auto_commit_task._enabled is False + assert commit_sync.call_count == (1 if commit_offsets else 0) assert mock_warn.call_count == (1 if warn else 0) assert mock_exc.call_count == (1 if exc else 0) |