diff options
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3d5669e..d5436c4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -50,14 +50,45 @@ class ConsumerCoordinator(AbstractCoordinator): 'group_id': 'kafka-python-default-group', 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, - 'default_offset_commit_callback': lambda offsets, error: True, + 'default_offset_commit_callback': lambda offsets, response: True, 'assignors': (), 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, } - """Initialize the coordination manager.""" + def __init__(self, client, subscription, **configs): + """Initialize the coordination manager. + + Keyword Arguments: + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. + assignors (list): List of objects to use to distribute partition + ownership amongst consumer instances when group management is + used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + """ super(ConsumerCoordinator, self).__init__(client, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: |