diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-29 15:45:48 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-29 15:45:48 -0800 |
commit | ac57ada2bd73b5a7ab1c2060207c4c9c6f0ff9a9 (patch) | |
tree | dd19b6a6714f6481d1f3a99abdf6fd53c4e6625a | |
parent | ab1e1d014f439af3684d8704f7b679e9e865bb88 (diff) | |
download | kafka-python-ac57ada2bd73b5a7ab1c2060207c4c9c6f0ff9a9.tar.gz |
Pass auto commit parameters from KafkaConsumer to ConsumerCoordinator
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 2 |
2 files changed, 3 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 059c4ee..63a1b2e 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -156,6 +156,8 @@ class KafkaConsumer(object): self._client, self._subscription, **kwargs) self._coordinator = ConsumerCoordinator( self._client, self._group_id, self._subscription, + enable_auto_commit=self._enable_auto_commit, + auto_commit_interval_ms=self._auto_commit_interval_ms, assignors=self._partition_assignment_strategy, **kwargs) self._closed = False diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index c17c593..119e372 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -46,7 +46,7 @@ class ConsumerProtocol(object): class ConsumerCoordinator(AbstractCoordinator): """This class manages the coordination process with the consumer coordinator.""" _enable_auto_commit = True - _auto_commit_interval_ms = 60 * 1000 + _auto_commit_interval_ms = 5000 _default_offset_commit_callback = lambda offsets, error: True _assignors = () #_heartbeat_interval_ms = 3000 |