summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 12:51:34 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 12:51:34 -0800
commite093ffefaecb59c26f2e480214f72a03ba5a49fc (patch)
tree88b4bd434b29a751b7e0adebbe4e7514a4c7e844 /kafka/client_async.py
parent1dd9e8bb05b6efc2888ac4cae8e7199b35dd633f (diff)
downloadkafka-python-e093ffefaecb59c26f2e480214f72a03ba5a49fc.tar.gz
More Docstring Improvements
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py191
1 files changed, 131 insertions, 60 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e8ab961..87d616c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -40,6 +40,33 @@ class KafkaClient(object):
}
def __init__(self, **configs):
+ """Initialize an asynchronous kafka client
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: 131072
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: 32768
+ """
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -128,12 +155,13 @@ class KafkaClient(object):
return state
def ready(self, node_id):
- """
- Begin connecting to the given node, return true if we are already
- connected and ready to send to that node.
+ """Check whether a node is connected and ok to send more requests.
- @param node_id The id of the node to check
- @return True if we are ready to send to the given node
+ Arguments:
+ node_id (int): the id of the node to check
+
+ Returns:
+ bool: True if we are ready to send to the given node
"""
if self.is_ready(node_id):
return True
@@ -151,7 +179,8 @@ class KafkaClient(object):
def close(self, node_id=None):
"""Closes the connection to a particular node (if there is one).
- @param node_id The id of the node
+ Arguments:
+ node_id (int): the id of the node to close
"""
if node_id is None:
for conn in self._conns.values():
@@ -163,27 +192,34 @@ class KafkaClient(object):
return
def is_disconnected(self, node_id):
+ """Check whether the node connection has been disconnected failed.
+ A disconnected node has either been closed or has failed. Connection
+ failures are usually transient and can be resumed in the next ready()
+ call, but there are cases where transient failures need to be caught
+ and re-acted upon.
- """
- Check if the connection of the node has failed, based on the connection
- state. Such connection failures are usually transient and can be resumed
- in the next ready(node) call, but there are cases where transient
- failures need to be caught and re-acted upon.
+ Arguments:
+ node_id (int): the id of the node to check
- @param node_id the id of the node to check
- @return true iff the connection has failed and the node is disconnected
+ Returns:
+ bool: True iff the node exists and is disconnected
"""
if node_id not in self._conns:
return False
return self._conns[node_id].state is ConnectionStates.DISCONNECTED
def is_ready(self, node_id):
- """
- Check if the node with the given id is ready to send more requests.
+ """Check whether a node is ready to send more requests.
+
+ In addition to connection-level checks, this method also is used to
+ block additional requests from being sent during a metadata refresh.
+
+ Arguments:
+ node_id (int): id of the node to check
- @param node_id The id of the node
- @return true if the node is ready
+ Returns:
+ bool: True if the node is ready and metadata is not refreshing
"""
# if we need to update our metadata now declare all requests unready to
# make metadata requests first priority
@@ -199,12 +235,17 @@ class KafkaClient(object):
return conn.connected() and conn.can_send_more()
def send(self, node_id, request):
- """
- Send the given request. Requests can only be sent out to ready nodes.
+ """Send a request to a specific node.
+
+ Arguments:
+ node_id (int): destination node
+ request (Struct): request object (not-encoded)
- @param node destination node
- @param request The request
- @param now The current timestamp
+ Raises:
+ IllegalStateError: if node_id is not ready
+
+ Returns:
+ Future: resolves to Response struct
"""
if not self._can_send_request(node_id):
raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id)
@@ -217,15 +258,20 @@ class KafkaClient(object):
return self._conns[node_id].send(request, expect_response=expect_response)
def poll(self, timeout_ms=None, future=None):
- """Do actual reads and writes to sockets.
-
- @param timeout_ms The maximum amount of time to wait (in ms) for
- responses if there are none available immediately.
- Must be non-negative. The actual timeout will be the
- minimum of timeout, request timeout and metadata
- timeout. If unspecified, default to request_timeout_ms
- @param future Optionally block until the provided future completes.
- @return The list of responses received.
+ """Try to read and write to sockets.
+
+ This method will also attempt to complete node connections, refresh
+ stale metadata, and run previously-scheduled tasks.
+
+ Arguments:
+ timeout_ms (int, optional): maximum amount of time to wait (in ms)
+ for at least one response. Must be non-negative. The actual
+ timeout will be the minimum of timeout, request timeout and
+ metadata timeout. Default: request_timeout_ms
+ future (Future, optional): if provided, blocks until future.is_done
+
+ Returns:
+ list: responses received (can be empty)
"""
if timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
@@ -283,7 +329,15 @@ class KafkaClient(object):
return responses
def in_flight_request_count(self, node_id=None):
- """Get the number of in-flight requests"""
+ """Get the number of in-flight requests for a node or all nodes.
+
+ Arguments:
+ node_id (int, optional): a specific node to check. If unspecified,
+ return the total for all nodes
+
+ Returns:
+ int: pending in-flight requests for the node, or all nodes if None
+ """
if node_id is not None:
if node_id not in self._conns:
return 0
@@ -292,16 +346,17 @@ class KafkaClient(object):
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
def least_loaded_node(self):
- """
- Choose the node with the fewest outstanding requests which is at least
- eligible for connection. This method will prefer a node with an
- existing connection, but will potentially choose a node for which we
- don't yet have a connection if all existing connections are in use.
- This method will never choose a node for which there is no existing
- connection and from which we have disconnected within the reconnect
- backoff period.
-
- @return The node_id with the fewest in-flight requests.
+ """Choose the node with fewest outstanding requests, with fallbacks.
+
+ This method will prefer a node with an existing connection, but will
+ potentially choose a node for which we don't yet have a connection if
+ all existing connections are in use. This method will never choose a
+ node that was disconnected within the reconnect backoff period.
+ If all else fails, the method will attempt to bootstrap again using the
+ bootstrap_servers list.
+
+ Returns:
+ node_id or None if no suitable node was found
"""
nodes = list(self._conns.keys())
random.shuffle(nodes)
@@ -339,10 +394,13 @@ class KafkaClient(object):
return None
def set_topics(self, topics):
- """
- Set specific topics to track for metadata
+ """Set specific topics to track for metadata.
+
+ Arguments:
+ topics (list of str): topics to check for metadata
- Returns a future that will complete after metadata request/response
+ Returns:
+ Future: resolves after metadata request/response
"""
if set(topics).difference(self._topics):
future = self.cluster.request_update()
@@ -353,7 +411,11 @@ class KafkaClient(object):
# request metadata update on disconnect and timedout
def _maybe_refresh_metadata(self):
- """Send a metadata request if needed"""
+ """Send a metadata request if needed.
+
+ Returns:
+ int: milliseconds until next refresh
+ """
ttl = self.cluster.ttl()
if ttl > 0:
return ttl
@@ -383,26 +445,30 @@ class KafkaClient(object):
return 0
def schedule(self, task, at):
- """
- Schedule a new task to be executed at the given time.
+ """Schedule a new task to be executed at the given time.
This is "best-effort" scheduling and should only be used for coarse
synchronization. A task cannot be scheduled for multiple times
simultaneously; any previously scheduled instance of the same task
will be cancelled.
- @param task The task to be scheduled -- function or implement __call__
- @param at Epoch seconds when it should run (see time.time())
- @returns Future
+ Arguments:
+ task (callable): task to be scheduled
+ at (float or int): epoch seconds when task should run
+
+ Returns:
+ Future: resolves to result of task call, or exception if raised
"""
return self._delayed_tasks.add(task, at)
def unschedule(self, task):
- """
- Unschedule a task. This will remove all instances of the task from the task queue.
+ """Unschedule a task.
+
+ This will remove all instances of the task from the task queue.
This is a no-op if the task is not scheduled.
- @param task The task to be unscheduled.
+ Arguments:
+ task (callable): task to be unscheduled
"""
self._delayed_tasks.remove(task)
@@ -415,10 +481,14 @@ class DelayedTaskQueue(object):
self._counter = itertools.count() # unique sequence count
def add(self, task, at):
- """Add a task to run at a later time
+ """Add a task to run at a later time.
+
+ Arguments:
+ task: can be anything, but generally a callable
+ at (float or int): epoch seconds to schedule task
- task: anything
- at: seconds from epoch to schedule task (see time.time())
+ Returns:
+ Future: a future that will be returned with the task when ready
"""
if task in self._task_map:
self.remove(task)
@@ -430,9 +500,10 @@ class DelayedTaskQueue(object):
return future
def remove(self, task):
- """Remove a previously scheduled task
+ """Remove a previously scheduled task.
- Raises KeyError if task is not found
+ Raises:
+ KeyError: if task is not found
"""
entry = self._task_map.pop(task)
task, future = entry[-1]
@@ -456,7 +527,7 @@ class DelayedTaskQueue(object):
return (task, future)
def next_at(self):
- """Number of seconds until next task is ready"""
+ """Number of seconds until next task is ready."""
self._drop_removed()
if not self._tasks:
return sys.maxint