diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 12:51:34 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 12:51:34 -0800 |
commit | e093ffefaecb59c26f2e480214f72a03ba5a49fc (patch) | |
tree | 88b4bd434b29a751b7e0adebbe4e7514a4c7e844 /kafka/client_async.py | |
parent | 1dd9e8bb05b6efc2888ac4cae8e7199b35dd633f (diff) | |
download | kafka-python-e093ffefaecb59c26f2e480214f72a03ba5a49fc.tar.gz |
More Docstring Improvements
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 191 |
1 files changed, 131 insertions, 60 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e8ab961..87d616c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -40,6 +40,33 @@ class KafkaClient(object): } def __init__(self, **configs): + """Initialize an asynchronous kafka client + + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 + """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -128,12 +155,13 @@ class KafkaClient(object): return state def ready(self, node_id): - """ - Begin connecting to the given node, return true if we are already - connected and ready to send to that node. + """Check whether a node is connected and ok to send more requests. - @param node_id The id of the node to check - @return True if we are ready to send to the given node + Arguments: + node_id (int): the id of the node to check + + Returns: + bool: True if we are ready to send to the given node """ if self.is_ready(node_id): return True @@ -151,7 +179,8 @@ class KafkaClient(object): def close(self, node_id=None): """Closes the connection to a particular node (if there is one). - @param node_id The id of the node + Arguments: + node_id (int): the id of the node to close """ if node_id is None: for conn in self._conns.values(): @@ -163,27 +192,34 @@ class KafkaClient(object): return def is_disconnected(self, node_id): + """Check whether the node connection has been disconnected failed. + A disconnected node has either been closed or has failed. Connection + failures are usually transient and can be resumed in the next ready() + call, but there are cases where transient failures need to be caught + and re-acted upon. - """ - Check if the connection of the node has failed, based on the connection - state. Such connection failures are usually transient and can be resumed - in the next ready(node) call, but there are cases where transient - failures need to be caught and re-acted upon. + Arguments: + node_id (int): the id of the node to check - @param node_id the id of the node to check - @return true iff the connection has failed and the node is disconnected + Returns: + bool: True iff the node exists and is disconnected """ if node_id not in self._conns: return False return self._conns[node_id].state is ConnectionStates.DISCONNECTED def is_ready(self, node_id): - """ - Check if the node with the given id is ready to send more requests. + """Check whether a node is ready to send more requests. + + In addition to connection-level checks, this method also is used to + block additional requests from being sent during a metadata refresh. + + Arguments: + node_id (int): id of the node to check - @param node_id The id of the node - @return true if the node is ready + Returns: + bool: True if the node is ready and metadata is not refreshing """ # if we need to update our metadata now declare all requests unready to # make metadata requests first priority @@ -199,12 +235,17 @@ class KafkaClient(object): return conn.connected() and conn.can_send_more() def send(self, node_id, request): - """ - Send the given request. Requests can only be sent out to ready nodes. + """Send a request to a specific node. + + Arguments: + node_id (int): destination node + request (Struct): request object (not-encoded) - @param node destination node - @param request The request - @param now The current timestamp + Raises: + IllegalStateError: if node_id is not ready + + Returns: + Future: resolves to Response struct """ if not self._can_send_request(node_id): raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id) @@ -217,15 +258,20 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) def poll(self, timeout_ms=None, future=None): - """Do actual reads and writes to sockets. - - @param timeout_ms The maximum amount of time to wait (in ms) for - responses if there are none available immediately. - Must be non-negative. The actual timeout will be the - minimum of timeout, request timeout and metadata - timeout. If unspecified, default to request_timeout_ms - @param future Optionally block until the provided future completes. - @return The list of responses received. + """Try to read and write to sockets. + + This method will also attempt to complete node connections, refresh + stale metadata, and run previously-scheduled tasks. + + Arguments: + timeout_ms (int, optional): maximum amount of time to wait (in ms) + for at least one response. Must be non-negative. The actual + timeout will be the minimum of timeout, request timeout and + metadata timeout. Default: request_timeout_ms + future (Future, optional): if provided, blocks until future.is_done + + Returns: + list: responses received (can be empty) """ if timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] @@ -283,7 +329,15 @@ class KafkaClient(object): return responses def in_flight_request_count(self, node_id=None): - """Get the number of in-flight requests""" + """Get the number of in-flight requests for a node or all nodes. + + Arguments: + node_id (int, optional): a specific node to check. If unspecified, + return the total for all nodes + + Returns: + int: pending in-flight requests for the node, or all nodes if None + """ if node_id is not None: if node_id not in self._conns: return 0 @@ -292,16 +346,17 @@ class KafkaClient(object): return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) def least_loaded_node(self): - """ - Choose the node with the fewest outstanding requests which is at least - eligible for connection. This method will prefer a node with an - existing connection, but will potentially choose a node for which we - don't yet have a connection if all existing connections are in use. - This method will never choose a node for which there is no existing - connection and from which we have disconnected within the reconnect - backoff period. - - @return The node_id with the fewest in-flight requests. + """Choose the node with fewest outstanding requests, with fallbacks. + + This method will prefer a node with an existing connection, but will + potentially choose a node for which we don't yet have a connection if + all existing connections are in use. This method will never choose a + node that was disconnected within the reconnect backoff period. + If all else fails, the method will attempt to bootstrap again using the + bootstrap_servers list. + + Returns: + node_id or None if no suitable node was found """ nodes = list(self._conns.keys()) random.shuffle(nodes) @@ -339,10 +394,13 @@ class KafkaClient(object): return None def set_topics(self, topics): - """ - Set specific topics to track for metadata + """Set specific topics to track for metadata. + + Arguments: + topics (list of str): topics to check for metadata - Returns a future that will complete after metadata request/response + Returns: + Future: resolves after metadata request/response """ if set(topics).difference(self._topics): future = self.cluster.request_update() @@ -353,7 +411,11 @@ class KafkaClient(object): # request metadata update on disconnect and timedout def _maybe_refresh_metadata(self): - """Send a metadata request if needed""" + """Send a metadata request if needed. + + Returns: + int: milliseconds until next refresh + """ ttl = self.cluster.ttl() if ttl > 0: return ttl @@ -383,26 +445,30 @@ class KafkaClient(object): return 0 def schedule(self, task, at): - """ - Schedule a new task to be executed at the given time. + """Schedule a new task to be executed at the given time. This is "best-effort" scheduling and should only be used for coarse synchronization. A task cannot be scheduled for multiple times simultaneously; any previously scheduled instance of the same task will be cancelled. - @param task The task to be scheduled -- function or implement __call__ - @param at Epoch seconds when it should run (see time.time()) - @returns Future + Arguments: + task (callable): task to be scheduled + at (float or int): epoch seconds when task should run + + Returns: + Future: resolves to result of task call, or exception if raised """ return self._delayed_tasks.add(task, at) def unschedule(self, task): - """ - Unschedule a task. This will remove all instances of the task from the task queue. + """Unschedule a task. + + This will remove all instances of the task from the task queue. This is a no-op if the task is not scheduled. - @param task The task to be unscheduled. + Arguments: + task (callable): task to be unscheduled """ self._delayed_tasks.remove(task) @@ -415,10 +481,14 @@ class DelayedTaskQueue(object): self._counter = itertools.count() # unique sequence count def add(self, task, at): - """Add a task to run at a later time + """Add a task to run at a later time. + + Arguments: + task: can be anything, but generally a callable + at (float or int): epoch seconds to schedule task - task: anything - at: seconds from epoch to schedule task (see time.time()) + Returns: + Future: a future that will be returned with the task when ready """ if task in self._task_map: self.remove(task) @@ -430,9 +500,10 @@ class DelayedTaskQueue(object): return future def remove(self, task): - """Remove a previously scheduled task + """Remove a previously scheduled task. - Raises KeyError if task is not found + Raises: + KeyError: if task is not found """ entry = self._task_map.pop(task) task, future = entry[-1] @@ -456,7 +527,7 @@ class DelayedTaskQueue(object): return (task, future) def next_at(self): - """Number of seconds until next task is ready""" + """Number of seconds until next task is ready.""" self._drop_removed() if not self._tasks: return sys.maxint |