diff options
-rw-r--r-- | kafka/coordinator/consumer.py | 52 | ||||
-rw-r--r-- | test/test_coordinator.py | 5 |
2 files changed, 8 insertions, 49 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a18329c..517f66a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -105,19 +105,12 @@ class ConsumerCoordinator(BaseCoordinator): else: interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) - - # When using broker-coordinated consumer groups, auto-commit will - # be automatically enabled on group join (see _on_join_complete) - # Otherwise, we should enable now b/c there will be no group join - if self.config['api_version'] < (0, 9): - self._auto_commit_task.enable() + self._auto_commit_task.reschedule() self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, self._subscription) def __del__(self): - if hasattr(self, '_auto_commit_task') and self._auto_commit_task: - self._auto_commit_task.disable() if hasattr(self, '_cluster') and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) @@ -211,9 +204,9 @@ class ConsumerCoordinator(BaseCoordinator): # based on the received assignment assignor.on_assignment(assignment) - # restart the autocommit task if needed + # reschedule the auto commit starting from now if self._auto_commit_task: - self._auto_commit_task.enable() + self._auto_commit_task.reschedule() assigned = set(self._subscription.assigned_partitions()) log.info("Setting newly assigned partitions %s for group %s", @@ -396,10 +389,6 @@ class ConsumerCoordinator(BaseCoordinator): if self._auto_commit_task is None: return - # disable periodic commits prior to committing synchronously. note that they will - # be re-enabled after a rebalance completes - self._auto_commit_task.disable() - try: self.commit_offsets_sync(self._subscription.all_consumed_offsets()) @@ -672,47 +661,25 @@ class AutoCommitTask(object): self._coordinator = coordinator self._client = coordinator._client self._interval = interval - self._enabled = False - self._request_in_flight = False - - def enable(self): - if self._enabled: - log.warning("AutoCommitTask is already enabled") - return - - self._enabled = True - if not self._request_in_flight: - self._client.schedule(self, time.time() + self._interval) - def disable(self): - self._enabled = False - try: - self._client.unschedule(self) - except KeyError: - pass - - def _reschedule(self, at): - assert self._enabled, 'AutoCommitTask not enabled' + def reschedule(self, at=None): + if at is None: + at = time.time() + self._interval self._client.schedule(self, at) def __call__(self): - if not self._enabled: - return - if self._coordinator.coordinator_unknown(): log.debug("Cannot auto-commit offsets for group %s because the" " coordinator is unknown", self._coordinator.group_id) backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0 - self._client.schedule(self, time.time() + backoff) + self.reschedule(time.time() + backoff) return - self._request_in_flight = True self._coordinator.commit_offsets_async( self._coordinator._subscription.all_consumed_offsets(), self._handle_commit_response) def _handle_commit_response(self, offsets, result): - self._request_in_flight = False if result is True: log.debug("Successfully auto-committed offsets for group %s", self._coordinator.group_id) @@ -731,10 +698,7 @@ class AutoCommitTask(object): self._coordinator.group_id, result) next_at = time.time() + self._interval - if not self._enabled: - log.warning("Skipping auto-commit reschedule -- it is disabled") - return - self._reschedule(next_at) + self.reschedule(next_at) class ConsumerCoordinatorMetrics(object): diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 280fa70..35598e8 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -370,10 +370,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, side_effect=error) if has_auto_commit: assert coordinator._auto_commit_task is not None - # auto-commit enable is defered until after group join in 0.9+ - if api_version >= (0, 9): - coordinator._auto_commit_task.enable() - assert coordinator._auto_commit_task._enabled is True else: assert coordinator._auto_commit_task is None @@ -381,7 +377,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, if has_auto_commit: assert coordinator._auto_commit_task is not None - assert coordinator._auto_commit_task._enabled is False assert commit_sync.call_count == (1 if commit_offsets else 0) assert mock_warn.call_count == (1 if warn else 0) |