summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index fda80aa..971f5e8 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -11,6 +11,7 @@ from kafka.vendor import six
from kafka.coordinator.base import BaseCoordinator, Generation
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
import kafka.errors as Errors
from kafka.future import Future
@@ -31,7 +32,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': None,
- 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
+ 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor),
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'max_poll_interval_ms': 300000,
@@ -234,6 +235,8 @@ class ConsumerCoordinator(BaseCoordinator):
# give the assignor a chance to update internal state
# based on the received assignment
assignor.on_assignment(assignment)
+ if assignor.name == 'sticky':
+ assignor.on_generation_assignment(generation)
# reschedule the auto commit starting from now
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval