diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e94b65d..1513f39 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -10,7 +10,7 @@ import threading # selectors in stdlib as of py3.4 try: - import selectors # pylint: disable=import-error + import selectors # pylint: disable=import-error except ImportError: # vendored backport module from .vendor import selectors34 as selectors @@ -175,7 +175,7 @@ class KafkaClient(object): self.config['api_version'], str(self.API_VERSIONS))) self.cluster = ClusterMetadata(**self.config) - self._topics = set() # empty set will fetch all topic metadata + self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._last_no_node_available_ms = 0 self._selector = self.config['selector']() @@ -343,7 +343,7 @@ class KafkaClient(object): return self._conns[node_id].connected() def close(self, node_id=None): - """Closes one or all broker connections. + """Close one or all broker connections. Arguments: node_id (int, optional): the id of the node to close @@ -381,7 +381,7 @@ class KafkaClient(object): def connection_delay(self, node_id): """ - Returns the number of milliseconds to wait, based on the connection + Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting, returns 0 to allow non-blocking connect to finish. When connected, returns a very large @@ -507,7 +507,7 @@ class KafkaClient(object): metadata_timeout_ms, self._delayed_tasks.next_at() * 1000, self.config['request_timeout_ms']) - timeout = max(0, timeout / 1000.0) # avoid negative timeouts + timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) @@ -562,7 +562,7 @@ class KafkaClient(object): # Accumulate as many responses as the connection has pending while conn.in_flight_requests: - response = conn.recv() # Note: conn.recv runs callbacks / errbacks + response = conn.recv() # Note: conn.recv runs callbacks / errbacks # Incomplete responses are buffered internally # while conn.in_flight_requests retains the request @@ -770,9 +770,9 @@ class KafkaClient(object): self._delayed_tasks.remove(task) def check_version(self, node_id=None, timeout=2, strict=False): - """Attempt to guess a broker version + """Attempt to guess the version of a Kafka broker. - Note: it is possible that this method blocks longer than the + Note: It is possible that this method blocks longer than the specified timeout. This can happen if the entire cluster is down and the client enters a bootstrap backoff sleep. This is only possible if node_id is None. @@ -831,9 +831,9 @@ class KafkaClient(object): class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html def __init__(self): - self._tasks = [] # list of entries arranged in a heap - self._task_map = {} # mapping of tasks to entries - self._counter = itertools.count() # unique sequence count + self._tasks = [] # list of entries arranged in a heap + self._task_map = {} # mapping of tasks to entries + self._counter = itertools.count() # unique sequence count def add(self, task, at): """Add a task to run at a later time. |