summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/consumer.py52
-rw-r--r--test/test_coordinator.py5
2 files changed, 8 insertions, 49 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index a18329c..517f66a 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -105,19 +105,12 @@ class ConsumerCoordinator(BaseCoordinator):
else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
-
- # When using broker-coordinated consumer groups, auto-commit will
- # be automatically enabled on group join (see _on_join_complete)
- # Otherwise, we should enable now b/c there will be no group join
- if self.config['api_version'] < (0, 9):
- self._auto_commit_task.enable()
+ self._auto_commit_task.reschedule()
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
self._subscription)
def __del__(self):
- if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
- self._auto_commit_task.disable()
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
@@ -211,9 +204,9 @@ class ConsumerCoordinator(BaseCoordinator):
# based on the received assignment
assignor.on_assignment(assignment)
- # restart the autocommit task if needed
+ # reschedule the auto commit starting from now
if self._auto_commit_task:
- self._auto_commit_task.enable()
+ self._auto_commit_task.reschedule()
assigned = set(self._subscription.assigned_partitions())
log.info("Setting newly assigned partitions %s for group %s",
@@ -396,10 +389,6 @@ class ConsumerCoordinator(BaseCoordinator):
if self._auto_commit_task is None:
return
- # disable periodic commits prior to committing synchronously. note that they will
- # be re-enabled after a rebalance completes
- self._auto_commit_task.disable()
-
try:
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
@@ -672,47 +661,25 @@ class AutoCommitTask(object):
self._coordinator = coordinator
self._client = coordinator._client
self._interval = interval
- self._enabled = False
- self._request_in_flight = False
-
- def enable(self):
- if self._enabled:
- log.warning("AutoCommitTask is already enabled")
- return
-
- self._enabled = True
- if not self._request_in_flight:
- self._client.schedule(self, time.time() + self._interval)
- def disable(self):
- self._enabled = False
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
-
- def _reschedule(self, at):
- assert self._enabled, 'AutoCommitTask not enabled'
+ def reschedule(self, at=None):
+ if at is None:
+ at = time.time() + self._interval
self._client.schedule(self, at)
def __call__(self):
- if not self._enabled:
- return
-
if self._coordinator.coordinator_unknown():
log.debug("Cannot auto-commit offsets for group %s because the"
" coordinator is unknown", self._coordinator.group_id)
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
- self._client.schedule(self, time.time() + backoff)
+ self.reschedule(time.time() + backoff)
return
- self._request_in_flight = True
self._coordinator.commit_offsets_async(
self._coordinator._subscription.all_consumed_offsets(),
self._handle_commit_response)
def _handle_commit_response(self, offsets, result):
- self._request_in_flight = False
if result is True:
log.debug("Successfully auto-committed offsets for group %s",
self._coordinator.group_id)
@@ -731,10 +698,7 @@ class AutoCommitTask(object):
self._coordinator.group_id, result)
next_at = time.time() + self._interval
- if not self._enabled:
- log.warning("Skipping auto-commit reschedule -- it is disabled")
- return
- self._reschedule(next_at)
+ self.reschedule(next_at)
class ConsumerCoordinatorMetrics(object):
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 280fa70..35598e8 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -370,10 +370,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
side_effect=error)
if has_auto_commit:
assert coordinator._auto_commit_task is not None
- # auto-commit enable is defered until after group join in 0.9+
- if api_version >= (0, 9):
- coordinator._auto_commit_task.enable()
- assert coordinator._auto_commit_task._enabled is True
else:
assert coordinator._auto_commit_task is None
@@ -381,7 +377,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
if has_auto_commit:
assert coordinator._auto_commit_task is not None
- assert coordinator._auto_commit_task._enabled is False
assert commit_sync.call_count == (1 if commit_offsets else 0)
assert mock_warn.call_count == (1 if warn else 0)