diff options
Diffstat (limited to 'kafka/coordinator/abstract.py')
-rw-r--r-- | kafka/coordinator/abstract.py | 74 |
1 files changed, 50 insertions, 24 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 89996c8..7c16034 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -53,6 +53,25 @@ class AbstractCoordinator(object): } def __init__(self, client, **configs): + """ + Keyword Arguments: + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + """ if not client: raise Errors.IllegalStateError('a client is required to use' ' Group Coordinator') @@ -79,7 +98,8 @@ class AbstractCoordinator(object): Unique identifier for the class of protocols implements (e.g. "consumer" or "connect"). - @return str protocol type name + Returns: + str: protocol type name """ pass @@ -96,7 +116,8 @@ class AbstractCoordinator(object): Note: metadata must be type bytes or support an encode() method - @return [(protocol, metadata), ...] + Returns: + list: [(protocol, metadata), ...] """ pass @@ -107,9 +128,10 @@ class AbstractCoordinator(object): This is typically used to perform any cleanup from the previous generation (such as committing offsets for the consumer) - @param generation The previous generation or -1 if there was none - @param member_id The identifier of this member in the previous group - or '' if there was none + Arguments: + generation (int): The previous generation or -1 if there was none + member_id (str): The identifier of this member in the previous group + or '' if there was none """ pass @@ -120,14 +142,16 @@ class AbstractCoordinator(object): This is used by the leader to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer) - @param leader_id: The id of the leader (which is this member) - @param protocol: the chosen group protocol (assignment strategy) - @param members: [(member_id, metadata_bytes)] from JoinGroupResponse. - metadata_bytes are associated with the chosen group - protocol, and the Coordinator subclass is responsible - for decoding metadata_bytes based on that protocol. + Arguments: + leader_id (str): The id of the leader (which is this member) + protocol (str): the chosen group protocol (assignment strategy) + members (list): [(member_id, metadata_bytes)] from + JoinGroupResponse. metadata_bytes are associated with the chosen + group protocol, and the Coordinator subclass is responsible for + decoding metadata_bytes based on that protocol. - @return dict of {member_id: assignment}; assignment must either be bytes + Returns: + dict: {member_id: assignment}; assignment must either be bytes or have an encode() method to convert to bytes """ pass @@ -137,22 +161,23 @@ class AbstractCoordinator(object): member_assignment_bytes): """Invoked when a group member has successfully joined a group. - @param generation The generation that was joined - @param member_id The identifier for the local member in the group - @param protocol The protocol selected by the coordinator - @param member_assignment_bytes The protocol-encoded assignment - propagated from the group leader. The Coordinator instance is - responsible for decoding based on the chosen protocol. + Arguments: + generation (int): the generation that was joined + member_id (str): the identifier for the local member in the group + protocol (str): the protocol selected by the coordinator + member_assignment_bytes (bytes): the protocol-encoded assignment + propagated from the group leader. The Coordinator instance is + responsible for decoding based on the chosen protocol. """ pass def coordinator_unknown(self): - """ - Check if we know who the coordinator is and we have an active connection + """Check if we know who the coordinator is and have an active connection Side-effect: reset coordinator_id to None if connection failed - @return True if the coordinator is unknown + Returns: + bool: True if the coordinator is unknown """ if self.coordinator_id is None: return True @@ -186,9 +211,10 @@ class AbstractCoordinator(object): raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): - """ - Check whether the group should be rejoined (e.g. if metadata changes) - @return True if it should, False otherwise + """Check whether the group should be rejoined (e.g. if metadata changes) + + Returns: + bool: True if it should, False otherwise """ return self.rejoin_needed |