summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-21 09:29:55 -0700
committerDana Powers <dana.powers@gmail.com>2017-12-21 11:39:25 -0800
commit21484ca84c2c24ea9f29bb2b5ae705d4f598db9f (patch)
treed7de3b7d2b86a657f6d5e1d86f1514a90ea1186a
parent7fa4560dafe13697acbe2d2b6e99c7b0659d0786 (diff)
downloadkafka-python-21484ca84c2c24ea9f29bb2b5ae705d4f598db9f.tar.gz
Updates from review
-rw-r--r--kafka/client_async.py3
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/coordinator/base.py8
-rw-r--r--kafka/coordinator/consumer.py17
-rw-r--r--kafka/coordinator/heartbeat.py4
-rw-r--r--kafka/errors.py7
6 files changed, 16 insertions, 25 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 590694f..24162ad 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -735,14 +735,13 @@ class KafkaClient(object):
self._topics.add(topic)
return self.cluster.request_update()
- # request metadata update on disconnect and timedout
+ # This method should be locked when running multi-threaded
def _maybe_refresh_metadata(self):
"""Send a metadata request if needed.
Returns:
int: milliseconds until next refresh
"""
- # This should be locked when running multi-threaded
ttl = self.cluster.ttl()
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
metadata_timeout = max(ttl, wait_for_in_progress_ms)
diff --git a/kafka/conn.py b/kafka/conn.py
index 68f2659..2b1008b 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -685,7 +685,7 @@ class BrokerConnection(object):
def recv(self):
"""Non-blocking network receive.
- Return list of (response, future)
+ Return list of (response, future) tuples
"""
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self)
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 0ed820e..431e52f 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -292,10 +292,10 @@ class BaseCoordinator(object):
"""
Check the status of the heartbeat thread (if it is active) and indicate
the liveness of the client. This must be called periodically after
- joining with :meth:`.ensureActiveGroup` to ensure that the member stays
+ joining with :meth:`.ensure_active_group` to ensure that the member stays
in the group. If an interval of time longer than the provided rebalance
- timeout expires without calling this method, then the client will
- proactively leave the group.
+ timeout (max_poll_interval_ms) expires without calling this method, then
+ the client will proactively leave the group.
Raises: RuntimeError for unexpected errors raised from the heartbeat thread
"""
@@ -330,7 +330,7 @@ class BaseCoordinator(object):
self._generation.protocol,
member_assignment_bytes)
- def _handle_join_failure(self, exception):
+ def _handle_join_failure(self, _):
with self._lock:
self.join_future = None
self.state = MemberState.UNJOINED
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 4285545..51fff23 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -251,7 +251,7 @@ class ConsumerCoordinator(BaseCoordinator):
# track of the fact that we need to rebalance again to reflect the
# change to the topic subscription. Without ensuring that the
# metadata is fresh, any metadata update that changes the topic
- # subscriptions and arrives with a rebalance in progress will
+ # subscriptions and arrives while a rebalance is in progress will
# essentially be ignored. See KAFKA-3949 for the complete
# description of the problem.
if self._subscription.subscribed_pattern:
@@ -264,11 +264,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._maybe_auto_commit_offsets_async()
def time_to_next_poll(self):
- """
- Return the time to the next needed invocation of {@link #poll(long)}.
- @param now current time in milliseconds
- @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
- """
+ """Return seconds (float) remaining until :meth:`.poll` should be called again"""
if not self.config['enable_auto_commit']:
return self.time_to_next_heartbeat()
@@ -396,12 +392,9 @@ class ConsumerCoordinator(BaseCoordinator):
super(ConsumerCoordinator, self).close()
def _invoke_completed_offset_commit_callbacks(self):
- try:
- while True:
- callback, offsets, exception = self.completed_offset_commits.popleft()
- callback(offsets, exception)
- except IndexError:
- pass
+ while self.completed_offset_commits:
+ callback, offsets, exception = self.completed_offset_commits.popleft()
+ callback(offsets, exception)
def commit_offsets_async(self, offsets, callback=None):
"""Commit specific offsets asynchronously.
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index 5cfbac5..2f5930b 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -44,6 +44,7 @@ class Heartbeat(object):
self.last_receive = time.time()
def time_to_next_heartbeat(self):
+ """Returns seconds (float) remaining before next heartbeat should be sent"""
time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset)
if self.heartbeat_failed:
delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000
@@ -58,9 +59,6 @@ class Heartbeat(object):
last_recv = max(self.last_receive, self.last_reset)
return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000)
- def interval(self):
- return self.config['heartbeat_interval_ms'] / 1000
-
def reset_timeouts(self):
self.last_reset = time.time()
self.last_poll = time.time()
diff --git a/kafka/errors.py b/kafka/errors.py
index 39dd623..c70853c 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -64,11 +64,12 @@ class CommitFailedError(KafkaError):
"""Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
- was longer than the configured max.poll.interval.ms, which
+ was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
- increasing the session timeout or by reducing the maximum
- size of batches returned in poll() with max.poll.records.
+ increasing the rebalance timeout with max_poll_interval_ms,
+ or by reducing the maximum size of batches returned in poll()
+ with max_poll_records.
""", *args, **kwargs)