diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-21 09:29:55 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-12-21 11:39:25 -0800 |
commit | 21484ca84c2c24ea9f29bb2b5ae705d4f598db9f (patch) | |
tree | d7de3b7d2b86a657f6d5e1d86f1514a90ea1186a | |
parent | 7fa4560dafe13697acbe2d2b6e99c7b0659d0786 (diff) | |
download | kafka-python-21484ca84c2c24ea9f29bb2b5ae705d4f598db9f.tar.gz |
Updates from review
-rw-r--r-- | kafka/client_async.py | 3 | ||||
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 8 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 17 | ||||
-rw-r--r-- | kafka/coordinator/heartbeat.py | 4 | ||||
-rw-r--r-- | kafka/errors.py | 7 |
6 files changed, 16 insertions, 25 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 590694f..24162ad 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -735,14 +735,13 @@ class KafkaClient(object): self._topics.add(topic) return self.cluster.request_update() - # request metadata update on disconnect and timedout + # This method should be locked when running multi-threaded def _maybe_refresh_metadata(self): """Send a metadata request if needed. Returns: int: milliseconds until next refresh """ - # This should be locked when running multi-threaded ttl = self.cluster.ttl() wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0 metadata_timeout = max(ttl, wait_for_in_progress_ms) diff --git a/kafka/conn.py b/kafka/conn.py index 68f2659..2b1008b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -685,7 +685,7 @@ class BrokerConnection(object): def recv(self): """Non-blocking network receive. - Return list of (response, future) + Return list of (response, future) tuples """ if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0ed820e..431e52f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -292,10 +292,10 @@ class BaseCoordinator(object): """ Check the status of the heartbeat thread (if it is active) and indicate the liveness of the client. This must be called periodically after - joining with :meth:`.ensureActiveGroup` to ensure that the member stays + joining with :meth:`.ensure_active_group` to ensure that the member stays in the group. If an interval of time longer than the provided rebalance - timeout expires without calling this method, then the client will - proactively leave the group. + timeout (max_poll_interval_ms) expires without calling this method, then + the client will proactively leave the group. Raises: RuntimeError for unexpected errors raised from the heartbeat thread """ @@ -330,7 +330,7 @@ class BaseCoordinator(object): self._generation.protocol, member_assignment_bytes) - def _handle_join_failure(self, exception): + def _handle_join_failure(self, _): with self._lock: self.join_future = None self.state = MemberState.UNJOINED diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 4285545..51fff23 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -251,7 +251,7 @@ class ConsumerCoordinator(BaseCoordinator): # track of the fact that we need to rebalance again to reflect the # change to the topic subscription. Without ensuring that the # metadata is fresh, any metadata update that changes the topic - # subscriptions and arrives with a rebalance in progress will + # subscriptions and arrives while a rebalance is in progress will # essentially be ignored. See KAFKA-3949 for the complete # description of the problem. if self._subscription.subscribed_pattern: @@ -264,11 +264,7 @@ class ConsumerCoordinator(BaseCoordinator): self._maybe_auto_commit_offsets_async() def time_to_next_poll(self): - """ - Return the time to the next needed invocation of {@link #poll(long)}. - @param now current time in milliseconds - @return the maximum time in milliseconds the caller should wait before the next invocation of poll() - """ + """Return seconds (float) remaining until :meth:`.poll` should be called again""" if not self.config['enable_auto_commit']: return self.time_to_next_heartbeat() @@ -396,12 +392,9 @@ class ConsumerCoordinator(BaseCoordinator): super(ConsumerCoordinator, self).close() def _invoke_completed_offset_commit_callbacks(self): - try: - while True: - callback, offsets, exception = self.completed_offset_commits.popleft() - callback(offsets, exception) - except IndexError: - pass + while self.completed_offset_commits: + callback, offsets, exception = self.completed_offset_commits.popleft() + callback(offsets, exception) def commit_offsets_async(self, offsets, callback=None): """Commit specific offsets asynchronously. diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 5cfbac5..2f5930b 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -44,6 +44,7 @@ class Heartbeat(object): self.last_receive = time.time() def time_to_next_heartbeat(self): + """Returns seconds (float) remaining before next heartbeat should be sent""" time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) if self.heartbeat_failed: delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000 @@ -58,9 +59,6 @@ class Heartbeat(object): last_recv = max(self.last_receive, self.last_reset) return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000) - def interval(self): - return self.config['heartbeat_interval_ms'] / 1000 - def reset_timeouts(self): self.last_reset = time.time() self.last_poll = time.time() diff --git a/kafka/errors.py b/kafka/errors.py index 39dd623..c70853c 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -64,11 +64,12 @@ class CommitFailedError(KafkaError): """Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() - was longer than the configured max.poll.interval.ms, which + was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by - increasing the session timeout or by reducing the maximum - size of batches returned in poll() with max.poll.records. + increasing the rebalance timeout with max_poll_interval_ms, + or by reducing the maximum size of batches returned in poll() + with max_poll_records. """, *args, **kwargs) |