summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py35
1 files changed, 33 insertions, 2 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 3d5669e..d5436c4 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -50,14 +50,45 @@ class ConsumerCoordinator(AbstractCoordinator):
'group_id': 'kafka-python-default-group',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
- 'default_offset_commit_callback': lambda offsets, error: True,
+ 'default_offset_commit_callback': lambda offsets, response: True,
'assignors': (),
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
}
- """Initialize the coordination manager."""
+ def __init__(self, client, subscription, **configs):
+ """Initialize the coordination manager.
+
+ Keyword Arguments:
+ group_id (str): name of the consumer group to join for dynamic
+ partition assignment (if enabled), and to use for fetching and
+ committing offsets. Default: 'kafka-python-default-group'
+ enable_auto_commit (bool): If true the consumer's offset will be
+ periodically committed in the background. Default: True.
+ auto_commit_interval_ms (int): milliseconds between automatic
+ offset commits, if enable_auto_commit is True. Default: 5000.
+ default_offset_commit_callback (callable): called as
+ callback(offsets, response) response will be either an Exception
+ or a OffsetCommitResponse struct. This callback can be used to
+ trigger custom actions when a commit request completes.
+ assignors (list): List of objects to use to distribute partition
+ ownership amongst consumer instances when group management is
+ used. Default: [RoundRobinPartitionAssignor]
+ heartbeat_interval_ms (int): The expected time in milliseconds
+ between heartbeats to the consumer coordinator when using
+ Kafka's group management feature. Heartbeats are used to ensure
+ that the consumer's session stays active and to facilitate
+ rebalancing when new consumers join or leave the group. The
+ value must be set lower than session_timeout_ms, but typically
+ should be set no higher than 1/3 of that value. It can be
+ adjusted even lower to control the expected time for normal
+ rebalances. Default: 3000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group managementment facilities. Default: 30000
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ """
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config: