summaryrefslogtreecommitdiff
path: root/kafka/coordinator/abstract.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/abstract.py')
-rw-r--r--kafka/coordinator/abstract.py74
1 files changed, 50 insertions, 24 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 89996c8..7c16034 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -53,6 +53,25 @@ class AbstractCoordinator(object):
}
def __init__(self, client, **configs):
+ """
+ Keyword Arguments:
+ group_id (str): name of the consumer group to join for dynamic
+ partition assignment (if enabled), and to use for fetching and
+ committing offsets. Default: 'kafka-python-default-group'
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group managementment facilities. Default: 30000
+ heartbeat_interval_ms (int): The expected time in milliseconds
+ between heartbeats to the consumer coordinator when using
+ Kafka's group management feature. Heartbeats are used to ensure
+ that the consumer's session stays active and to facilitate
+ rebalancing when new consumers join or leave the group. The
+ value must be set lower than session_timeout_ms, but typically
+ should be set no higher than 1/3 of that value. It can be
+ adjusted even lower to control the expected time for normal
+ rebalances. Default: 3000
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ """
if not client:
raise Errors.IllegalStateError('a client is required to use'
' Group Coordinator')
@@ -79,7 +98,8 @@ class AbstractCoordinator(object):
Unique identifier for the class of protocols implements
(e.g. "consumer" or "connect").
- @return str protocol type name
+ Returns:
+ str: protocol type name
"""
pass
@@ -96,7 +116,8 @@ class AbstractCoordinator(object):
Note: metadata must be type bytes or support an encode() method
- @return [(protocol, metadata), ...]
+ Returns:
+ list: [(protocol, metadata), ...]
"""
pass
@@ -107,9 +128,10 @@ class AbstractCoordinator(object):
This is typically used to perform any cleanup from the previous
generation (such as committing offsets for the consumer)
- @param generation The previous generation or -1 if there was none
- @param member_id The identifier of this member in the previous group
- or '' if there was none
+ Arguments:
+ generation (int): The previous generation or -1 if there was none
+ member_id (str): The identifier of this member in the previous group
+ or '' if there was none
"""
pass
@@ -120,14 +142,16 @@ class AbstractCoordinator(object):
This is used by the leader to push state to all the members of the group
(e.g. to push partition assignments in the case of the new consumer)
- @param leader_id: The id of the leader (which is this member)
- @param protocol: the chosen group protocol (assignment strategy)
- @param members: [(member_id, metadata_bytes)] from JoinGroupResponse.
- metadata_bytes are associated with the chosen group
- protocol, and the Coordinator subclass is responsible
- for decoding metadata_bytes based on that protocol.
+ Arguments:
+ leader_id (str): The id of the leader (which is this member)
+ protocol (str): the chosen group protocol (assignment strategy)
+ members (list): [(member_id, metadata_bytes)] from
+ JoinGroupResponse. metadata_bytes are associated with the chosen
+ group protocol, and the Coordinator subclass is responsible for
+ decoding metadata_bytes based on that protocol.
- @return dict of {member_id: assignment}; assignment must either be bytes
+ Returns:
+ dict: {member_id: assignment}; assignment must either be bytes
or have an encode() method to convert to bytes
"""
pass
@@ -137,22 +161,23 @@ class AbstractCoordinator(object):
member_assignment_bytes):
"""Invoked when a group member has successfully joined a group.
- @param generation The generation that was joined
- @param member_id The identifier for the local member in the group
- @param protocol The protocol selected by the coordinator
- @param member_assignment_bytes The protocol-encoded assignment
- propagated from the group leader. The Coordinator instance is
- responsible for decoding based on the chosen protocol.
+ Arguments:
+ generation (int): the generation that was joined
+ member_id (str): the identifier for the local member in the group
+ protocol (str): the protocol selected by the coordinator
+ member_assignment_bytes (bytes): the protocol-encoded assignment
+ propagated from the group leader. The Coordinator instance is
+ responsible for decoding based on the chosen protocol.
"""
pass
def coordinator_unknown(self):
- """
- Check if we know who the coordinator is and we have an active connection
+ """Check if we know who the coordinator is and have an active connection
Side-effect: reset coordinator_id to None if connection failed
- @return True if the coordinator is unknown
+ Returns:
+ bool: True if the coordinator is unknown
"""
if self.coordinator_id is None:
return True
@@ -186,9 +211,10 @@ class AbstractCoordinator(object):
raise future.exception # pylint: disable-msg=raising-bad-type
def need_rejoin(self):
- """
- Check whether the group should be rejoined (e.g. if metadata changes)
- @return True if it should, False otherwise
+ """Check whether the group should be rejoined (e.g. if metadata changes)
+
+ Returns:
+ bool: True if it should, False otherwise
"""
return self.rejoin_needed