diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 12:51:34 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 12:51:34 -0800 |
commit | e093ffefaecb59c26f2e480214f72a03ba5a49fc (patch) | |
tree | 88b4bd434b29a751b7e0adebbe4e7514a4c7e844 | |
parent | 1dd9e8bb05b6efc2888ac4cae8e7199b35dd633f (diff) | |
download | kafka-python-e093ffefaecb59c26f2e480214f72a03ba5a49fc.tar.gz |
More Docstring Improvements
-rw-r--r-- | kafka/client_async.py | 191 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 133 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 113 | ||||
-rw-r--r-- | kafka/coordinator/abstract.py | 74 | ||||
-rw-r--r-- | kafka/coordinator/assignors/abstract.py | 27 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 35 |
7 files changed, 431 insertions, 152 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e8ab961..87d616c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -40,6 +40,33 @@ class KafkaClient(object): } def __init__(self, **configs): + """Initialize an asynchronous kafka client + + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 + """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -128,12 +155,13 @@ class KafkaClient(object): return state def ready(self, node_id): - """ - Begin connecting to the given node, return true if we are already - connected and ready to send to that node. + """Check whether a node is connected and ok to send more requests. - @param node_id The id of the node to check - @return True if we are ready to send to the given node + Arguments: + node_id (int): the id of the node to check + + Returns: + bool: True if we are ready to send to the given node """ if self.is_ready(node_id): return True @@ -151,7 +179,8 @@ class KafkaClient(object): def close(self, node_id=None): """Closes the connection to a particular node (if there is one). - @param node_id The id of the node + Arguments: + node_id (int): the id of the node to close """ if node_id is None: for conn in self._conns.values(): @@ -163,27 +192,34 @@ class KafkaClient(object): return def is_disconnected(self, node_id): + """Check whether the node connection has been disconnected failed. + A disconnected node has either been closed or has failed. Connection + failures are usually transient and can be resumed in the next ready() + call, but there are cases where transient failures need to be caught + and re-acted upon. - """ - Check if the connection of the node has failed, based on the connection - state. Such connection failures are usually transient and can be resumed - in the next ready(node) call, but there are cases where transient - failures need to be caught and re-acted upon. + Arguments: + node_id (int): the id of the node to check - @param node_id the id of the node to check - @return true iff the connection has failed and the node is disconnected + Returns: + bool: True iff the node exists and is disconnected """ if node_id not in self._conns: return False return self._conns[node_id].state is ConnectionStates.DISCONNECTED def is_ready(self, node_id): - """ - Check if the node with the given id is ready to send more requests. + """Check whether a node is ready to send more requests. + + In addition to connection-level checks, this method also is used to + block additional requests from being sent during a metadata refresh. + + Arguments: + node_id (int): id of the node to check - @param node_id The id of the node - @return true if the node is ready + Returns: + bool: True if the node is ready and metadata is not refreshing """ # if we need to update our metadata now declare all requests unready to # make metadata requests first priority @@ -199,12 +235,17 @@ class KafkaClient(object): return conn.connected() and conn.can_send_more() def send(self, node_id, request): - """ - Send the given request. Requests can only be sent out to ready nodes. + """Send a request to a specific node. + + Arguments: + node_id (int): destination node + request (Struct): request object (not-encoded) - @param node destination node - @param request The request - @param now The current timestamp + Raises: + IllegalStateError: if node_id is not ready + + Returns: + Future: resolves to Response struct """ if not self._can_send_request(node_id): raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id) @@ -217,15 +258,20 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) def poll(self, timeout_ms=None, future=None): - """Do actual reads and writes to sockets. - - @param timeout_ms The maximum amount of time to wait (in ms) for - responses if there are none available immediately. - Must be non-negative. The actual timeout will be the - minimum of timeout, request timeout and metadata - timeout. If unspecified, default to request_timeout_ms - @param future Optionally block until the provided future completes. - @return The list of responses received. + """Try to read and write to sockets. + + This method will also attempt to complete node connections, refresh + stale metadata, and run previously-scheduled tasks. + + Arguments: + timeout_ms (int, optional): maximum amount of time to wait (in ms) + for at least one response. Must be non-negative. The actual + timeout will be the minimum of timeout, request timeout and + metadata timeout. Default: request_timeout_ms + future (Future, optional): if provided, blocks until future.is_done + + Returns: + list: responses received (can be empty) """ if timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] @@ -283,7 +329,15 @@ class KafkaClient(object): return responses def in_flight_request_count(self, node_id=None): - """Get the number of in-flight requests""" + """Get the number of in-flight requests for a node or all nodes. + + Arguments: + node_id (int, optional): a specific node to check. If unspecified, + return the total for all nodes + + Returns: + int: pending in-flight requests for the node, or all nodes if None + """ if node_id is not None: if node_id not in self._conns: return 0 @@ -292,16 +346,17 @@ class KafkaClient(object): return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) def least_loaded_node(self): - """ - Choose the node with the fewest outstanding requests which is at least - eligible for connection. This method will prefer a node with an - existing connection, but will potentially choose a node for which we - don't yet have a connection if all existing connections are in use. - This method will never choose a node for which there is no existing - connection and from which we have disconnected within the reconnect - backoff period. - - @return The node_id with the fewest in-flight requests. + """Choose the node with fewest outstanding requests, with fallbacks. + + This method will prefer a node with an existing connection, but will + potentially choose a node for which we don't yet have a connection if + all existing connections are in use. This method will never choose a + node that was disconnected within the reconnect backoff period. + If all else fails, the method will attempt to bootstrap again using the + bootstrap_servers list. + + Returns: + node_id or None if no suitable node was found """ nodes = list(self._conns.keys()) random.shuffle(nodes) @@ -339,10 +394,13 @@ class KafkaClient(object): return None def set_topics(self, topics): - """ - Set specific topics to track for metadata + """Set specific topics to track for metadata. + + Arguments: + topics (list of str): topics to check for metadata - Returns a future that will complete after metadata request/response + Returns: + Future: resolves after metadata request/response """ if set(topics).difference(self._topics): future = self.cluster.request_update() @@ -353,7 +411,11 @@ class KafkaClient(object): # request metadata update on disconnect and timedout def _maybe_refresh_metadata(self): - """Send a metadata request if needed""" + """Send a metadata request if needed. + + Returns: + int: milliseconds until next refresh + """ ttl = self.cluster.ttl() if ttl > 0: return ttl @@ -383,26 +445,30 @@ class KafkaClient(object): return 0 def schedule(self, task, at): - """ - Schedule a new task to be executed at the given time. + """Schedule a new task to be executed at the given time. This is "best-effort" scheduling and should only be used for coarse synchronization. A task cannot be scheduled for multiple times simultaneously; any previously scheduled instance of the same task will be cancelled. - @param task The task to be scheduled -- function or implement __call__ - @param at Epoch seconds when it should run (see time.time()) - @returns Future + Arguments: + task (callable): task to be scheduled + at (float or int): epoch seconds when task should run + + Returns: + Future: resolves to result of task call, or exception if raised """ return self._delayed_tasks.add(task, at) def unschedule(self, task): - """ - Unschedule a task. This will remove all instances of the task from the task queue. + """Unschedule a task. + + This will remove all instances of the task from the task queue. This is a no-op if the task is not scheduled. - @param task The task to be unscheduled. + Arguments: + task (callable): task to be unscheduled """ self._delayed_tasks.remove(task) @@ -415,10 +481,14 @@ class DelayedTaskQueue(object): self._counter = itertools.count() # unique sequence count def add(self, task, at): - """Add a task to run at a later time + """Add a task to run at a later time. + + Arguments: + task: can be anything, but generally a callable + at (float or int): epoch seconds to schedule task - task: anything - at: seconds from epoch to schedule task (see time.time()) + Returns: + Future: a future that will be returned with the task when ready """ if task in self._task_map: self.remove(task) @@ -430,9 +500,10 @@ class DelayedTaskQueue(object): return future def remove(self, task): - """Remove a previously scheduled task + """Remove a previously scheduled task. - Raises KeyError if task is not found + Raises: + KeyError: if task is not found """ entry = self._task_map.pop(task) task, future = entry[-1] @@ -456,7 +527,7 @@ class DelayedTaskQueue(object): return (task, future) def next_at(self): - """Number of seconds until next task is ready""" + """Number of seconds until next task is ready.""" self._drop_removed() if not self._tasks: return sys.maxint diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 39e1244..a4be7ae 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -39,6 +39,33 @@ class Fetcher(object): } def __init__(self, client, subscriptions, **configs): + """Initialize a Kafka Message Fetcher. + + Keyword Arguments: + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + """ #metrics=None, #metric_group_prefix='consumer', self.config = copy.copy(self.DEFAULT_CONFIG) @@ -56,7 +83,11 @@ class Fetcher(object): #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) def init_fetches(self): - """Send FetchRequests asynchronously for all assigned partitions""" + """Send FetchRequests asynchronously for all assigned partitions. + + Returns: + List of Futures: each future resolves to a FetchResponse + """ futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -70,8 +101,11 @@ class Fetcher(object): def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. - @param partitions: iterable of TopicPartitions - @raises NoOffsetForPartitionError If no offset is stored for a given + Arguments: + partitions (list of TopicPartitions): partitions to update + + Raises: + NoOffsetForPartitionError: if no offset is stored for a given partition and no reset policy is available """ # reset the fetch position to the committed position @@ -104,8 +138,11 @@ class Fetcher(object): def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. - @param partition The given partition that needs reset offset - @raises NoOffsetForPartitionError If no offset reset strategy is defined + Arguments: + partition (TopicPartition): the partition that needs reset offset + + Raises: + NoOffsetForPartitionError: if no offset reset strategy is defined """ timestamp = self._subscriptions.assignment[partition].reset_strategy if timestamp is OffsetResetStrategy.EARLIEST: @@ -129,11 +166,14 @@ class Fetcher(object): Blocks until offset is obtained, or a non-retriable exception is raised - @param partition The partition that needs fetching offset. - @param timestamp The timestamp for fetching offset. - @raises exceptions - @return The offset of the message that is published before the given - timestamp + Arguments: + partition The partition that needs fetching offset. + timestamp (int): timestamp for fetching offset. -1 for the latest + available, -2 for the earliest available. Otherwise timestamp + is treated as epoch seconds. + + Returns: + int: message offset """ while True: future = self._send_offset_request(partition, timestamp) @@ -150,10 +190,12 @@ class Fetcher(object): self._client.poll(future=refresh_future) def _raise_if_offset_out_of_range(self): - """ - If any partition from previous FetchResponse contains - OffsetOutOfRangeError and the default_reset_policy is None, - raise OffsetOutOfRangeError + """Check FetchResponses for offset out of range. + + Raises: + OffsetOutOfRangeError: if any partition from previous FetchResponse + contains OffsetOutOfRangeError and the default_reset_policy is + None """ current_out_of_range_partitions = {} @@ -174,11 +216,10 @@ class Fetcher(object): raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions) def _raise_if_unauthorized_topics(self): - """ - If any topic from previous FetchResponse contains an Authorization - error, raise an exception + """Check FetchResponses for topic authorization failures. - @raise TopicAuthorizationFailedError + Raises: + TopicAuthorizationFailedError """ if self._unauthorized_topics: topics = set(self._unauthorized_topics) @@ -186,12 +227,10 @@ class Fetcher(object): raise Errors.TopicAuthorizationFailedError(topics) def _raise_if_record_too_large(self): - """ - If any partition from previous FetchResponse gets a RecordTooLarge - error, raise RecordTooLargeError + """Check FetchResponses for messages larger than the max per partition. - @raise RecordTooLargeError If there is a message larger than fetch size - and hence cannot be ever returned + Raises: + RecordTooLargeError: if there is a message larger than fetch size """ copied_record_too_large_partitions = dict(self._record_too_large_partitions) self._record_too_large_partitions.clear() @@ -207,12 +246,21 @@ class Fetcher(object): self.config['max_partition_fetch_bytes']) def fetched_records(self): - """Returns previously fetched records and updates consumed offsets + """Returns previously fetched records and updates consumed offsets. NOTE: returning empty records guarantees the consumed position are NOT updated. - @return {TopicPartition: deque([messages])} - @raises OffsetOutOfRangeError if no subscription offset_reset_strategy + Raises: + OffsetOutOfRangeError: if no subscription offset_reset_strategy + InvalidMessageError: if message crc validation fails (check_crcs + must be set to True) + RecordTooLargeError: if a message is larger than the currently + configured max_partition_fetch_bytes + TopicAuthorizationError: if consumer is not authorized to fetch + messages from the topic + + Returns: + dict: {TopicPartition: deque([messages])} """ if self._subscriptions.needs_partition_assignment: return {} @@ -280,12 +328,14 @@ class Fetcher(object): return key, value def _send_offset_request(self, partition, timestamp): - """ - Fetch a single offset before the given timestamp for the partition. + """Fetch a single offset before the given timestamp for the partition. - @param partition The TopicPartition that needs fetching offset. - @param timestamp The timestamp for fetching offset. - @return A future which can be polled to obtain the corresponding offset. + Arguments: + partition (TopicPartition): partition that needs fetching offset + timestamp (int): timestamp for fetching offset + + Returns: + Future: resolves to the corresponding offset """ node_id = self._client.cluster.leader_for_partition(partition) if node_id is None: @@ -315,11 +365,13 @@ class Fetcher(object): def _handle_offset_response(self, partition, future, response): """Callback for the response of the list offset call above. - @param partition The partition that was fetched - @param future the future to update based on response - @param response The OffsetResponse from the server + Arguments: + partition (TopicPartition): The partition that was fetched + future (Future): the future to update based on response + response (OffsetResponse): response from the server - @raises IllegalStateError if response does not match partition + Raises: + IllegalStateError: if response does not match partition """ topic, partition_info = response.topics[0] if len(response.topics) != 1 or len(partition_info) != 1: @@ -351,10 +403,13 @@ class Fetcher(object): future.failure(error_type(partition)) def _create_fetch_requests(self): - """ - Create fetch requests for all assigned partitions, grouped by node - Except where no leader, node has requests in flight, or we have - not returned all previously fetched records to consumer + """Create fetch requests for all assigned partitions, grouped by node. + + FetchRequests skipped if no leader, node has requests in flight, or we + have not returned all previously fetched records to consumer + + Returns: + dict: {node_id: [FetchRequest,...]} """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 00955f8..14485d2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -114,6 +114,10 @@ class KafkaConsumer(object): periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. check_crcs (bool): Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may @@ -438,13 +442,17 @@ class KafkaConsumer(object): self._subscription.resume(partition) def seek(self, partition, offset): - """Manually specify the fetch offset for a TopicPartition + """Manually specify the fetch offset for a TopicPartition. Overrides the fetch offsets that the consumer will use on the next poll(). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition """ if offset < 0: raise Errors.IllegalStateError("seek offset must not be a negative number") diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 38d4571..fa36bc2 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -42,10 +42,10 @@ class SubscriptionState(object): def __init__(self, offset_reset_strategy='earliest'): """Initialize a SubscriptionState instance - offset_reset_strategy: 'earliest' or 'latest', otherwise - exception will be raised when fetching an offset - that is no longer available. - Defaults to earliest. + Keyword Arguments: + offset_reset_strategy: 'earliest' or 'latest', otherwise + exception will be raised when fetching an offset that is no + longer available. Default: 'earliest' """ try: offset_reset_strategy = getattr(OffsetResetStrategy, @@ -67,14 +67,39 @@ class SubscriptionState(object): self.needs_fetch_committed_offsets = True def subscribe(self, topics=(), pattern=None, listener=None): - """Subscribe to a list of topics, or a topic regex pattern + """Subscribe to a list of topics, or a topic regex pattern. - Partitions will be assigned via a group coordinator - (incompatible with assign_from_user) + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). - Optionally include listener callback, which must be a - ConsumerRebalanceListener and will be called before and - after each rebalance operation. + This method is incompatible with assign_from_user() + + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. """ if self._user_assignment or (topics and pattern): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -93,6 +118,14 @@ class SubscriptionState(object): self.listener = listener def change_subscription(self, topics): + """Change the topic subscription. + + Arguments: + topics (list of str): topics for subscription + + Raises: + IllegalStateErrror: if assign_from_user has been used already + """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -117,7 +150,8 @@ class SubscriptionState(object): This is used by the group leader to ensure that it receives metadata updates for all topics that any member of the group is subscribed to. - @param topics list of topics to add to the group subscription + Arguments: + topics (list of str): topics to add to the group subscription """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -128,12 +162,22 @@ class SubscriptionState(object): self.needs_partition_assignment = True def assign_from_user(self, partitions): - """ - Change the assignment to the specified partitions provided by the user, - note this is different from assign_from_subscribed() - whose input partitions are provided from the subscribed topics. + """Manually assign a list of TopicPartitions to this consumer. + + This interface does not allow for incremental assignment and will + replace the previous assignment (if there was one). - @param partitions: list (or iterable) of TopicPartition() + Manual topic assignment through this method does not use the consumer's + group management functionality. As such, there will be no rebalance + operation triggered when group membership or cluster and topic metadata + change. Note that it is not possible to use both manual partition + assignment with assign() and group assignment with subscribe(). + + Arguments: + partitions (list of TopicPartition): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called subscribe() """ if self.subscription is not None: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -175,6 +219,7 @@ class SubscriptionState(object): log.info("Updated partition assignment: %s", assignments) def unsubscribe(self): + """Clear all topic subscriptions and partition assignments""" self.subscription = None self._user_assignment.clear() self.assignment.clear() @@ -191,17 +236,32 @@ class SubscriptionState(object): that would require rebalancing (the leader fetches metadata for all topics in the group so that it can do partition assignment). - @return set of topics + Returns: + set: topics """ return self._group_subscription def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition. + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + """ self.assignment[partition].seek(offset) def assigned_partitions(self): + """Return set of TopicPartitions in current assignment.""" return set(self.assignment.keys()) def fetchable_partitions(self): + """Return set of TopicPartitions that should be Fetched.""" fetchable = set() for partition, state in six.iteritems(self.assignment): if state.is_fetchable(): @@ -209,6 +269,7 @@ class SubscriptionState(object): return fetchable def partitions_auto_assigned(self): + """Return True unless user supplied partitions manually.""" return self.subscription is not None def all_consumed_offsets(self): @@ -220,11 +281,18 @@ class SubscriptionState(object): return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): + """Mark partition for offset reset using specified or default strategy. + + Arguments: + partition (TopicPartition): partition to mark + offset_reset_strategy (OffsetResetStrategy, optional) + """ if offset_reset_strategy is None: offset_reset_strategy = self._default_offset_reset_strategy self.assignment[partition].await_reset(offset_reset_strategy) def has_default_offset_reset_policy(self): + """Return True if default offset reset policy is Earliest or Latest""" return self._default_offset_reset_strategy != OffsetResetStrategy.NONE def is_offset_reset_needed(self, partition): @@ -372,8 +440,9 @@ class ConsumerRebalanceListener(object): NOTE: This method is only called before rebalances. It is not called prior to KafkaConsumer.close() - @param partitions The list of partitions that were assigned to the - consumer on the last rebalance + Arguments: + revoked (list of TopicPartition): the partitions that were assigned + to the consumer on the last rebalance """ pass @@ -389,8 +458,8 @@ class ConsumerRebalanceListener(object): their on_partitions_revoked() callback before any instance executes its on_partitions_assigned() callback. - @param partitions The list of partitions that are now assigned to the - consumer (may include partitions previously assigned - to the consumer) + Arguments: + assigned (list of TopicPartition): the partitions assigned to the + consumer (may include partitions that were previously assigned) """ pass diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 89996c8..7c16034 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -53,6 +53,25 @@ class AbstractCoordinator(object): } def __init__(self, client, **configs): + """ + Keyword Arguments: + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + """ if not client: raise Errors.IllegalStateError('a client is required to use' ' Group Coordinator') @@ -79,7 +98,8 @@ class AbstractCoordinator(object): Unique identifier for the class of protocols implements (e.g. "consumer" or "connect"). - @return str protocol type name + Returns: + str: protocol type name """ pass @@ -96,7 +116,8 @@ class AbstractCoordinator(object): Note: metadata must be type bytes or support an encode() method - @return [(protocol, metadata), ...] + Returns: + list: [(protocol, metadata), ...] """ pass @@ -107,9 +128,10 @@ class AbstractCoordinator(object): This is typically used to perform any cleanup from the previous generation (such as committing offsets for the consumer) - @param generation The previous generation or -1 if there was none - @param member_id The identifier of this member in the previous group - or '' if there was none + Arguments: + generation (int): The previous generation or -1 if there was none + member_id (str): The identifier of this member in the previous group + or '' if there was none """ pass @@ -120,14 +142,16 @@ class AbstractCoordinator(object): This is used by the leader to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer) - @param leader_id: The id of the leader (which is this member) - @param protocol: the chosen group protocol (assignment strategy) - @param members: [(member_id, metadata_bytes)] from JoinGroupResponse. - metadata_bytes are associated with the chosen group - protocol, and the Coordinator subclass is responsible - for decoding metadata_bytes based on that protocol. + Arguments: + leader_id (str): The id of the leader (which is this member) + protocol (str): the chosen group protocol (assignment strategy) + members (list): [(member_id, metadata_bytes)] from + JoinGroupResponse. metadata_bytes are associated with the chosen + group protocol, and the Coordinator subclass is responsible for + decoding metadata_bytes based on that protocol. - @return dict of {member_id: assignment}; assignment must either be bytes + Returns: + dict: {member_id: assignment}; assignment must either be bytes or have an encode() method to convert to bytes """ pass @@ -137,22 +161,23 @@ class AbstractCoordinator(object): member_assignment_bytes): """Invoked when a group member has successfully joined a group. - @param generation The generation that was joined - @param member_id The identifier for the local member in the group - @param protocol The protocol selected by the coordinator - @param member_assignment_bytes The protocol-encoded assignment - propagated from the group leader. The Coordinator instance is - responsible for decoding based on the chosen protocol. + Arguments: + generation (int): the generation that was joined + member_id (str): the identifier for the local member in the group + protocol (str): the protocol selected by the coordinator + member_assignment_bytes (bytes): the protocol-encoded assignment + propagated from the group leader. The Coordinator instance is + responsible for decoding based on the chosen protocol. """ pass def coordinator_unknown(self): - """ - Check if we know who the coordinator is and we have an active connection + """Check if we know who the coordinator is and have an active connection Side-effect: reset coordinator_id to None if connection failed - @return True if the coordinator is unknown + Returns: + bool: True if the coordinator is unknown """ if self.coordinator_id is None: return True @@ -186,9 +211,10 @@ class AbstractCoordinator(object): raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): - """ - Check whether the group should be rejoined (e.g. if metadata changes) - @return True if it should, False otherwise + """Check whether the group should be rejoined (e.g. if metadata changes) + + Returns: + bool: True if it should, False otherwise """ return self.rejoin_needed diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index ed09a6e..773280a 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -19,17 +19,36 @@ class AbstractPartitionAssignor(object): def assign(self, cluster, members): """Perform group assignment given cluster metadata and member subscriptions - @param cluster: cluster metadata - @param members: {member_id: subscription} - @return {member_id: MemberAssignment} + Arguments: + cluster (ClusterMetadata): metadata for use in assignment + members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + + Returns: + dict: {member_id: MemberAssignment} """ pass @abc.abstractmethod def metadata(self, topics): - """return ProtocolMetadata to be submitted via JoinGroupRequest""" + """Generate ProtocolMetadata to be submitted via JoinGroupRequest. + + Arguments: + topics (set): a member's subscribed topics + + Returns: + MemberMetadata struct + """ pass @abc.abstractmethod def on_assignment(self, assignment): + """Callback that runs on each assignment. + + This method can be used to update internal state, if any, of the + partition assignor. + + Arguments: + assignment (MemberAssignment): the member's assignment + """ pass diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3d5669e..d5436c4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -50,14 +50,45 @@ class ConsumerCoordinator(AbstractCoordinator): 'group_id': 'kafka-python-default-group', 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, - 'default_offset_commit_callback': lambda offsets, error: True, + 'default_offset_commit_callback': lambda offsets, response: True, 'assignors': (), 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, } - """Initialize the coordination manager.""" + def __init__(self, client, subscription, **configs): + """Initialize the coordination manager. + + Keyword Arguments: + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. + assignors (list): List of objects to use to distribute partition + ownership amongst consumer instances when group management is + used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + """ super(ConsumerCoordinator, self).__init__(client, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: |