summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py11
-rw-r--r--kafka/consumer/fetcher.py75
-rw-r--r--kafka/consumer/group.py45
3 files changed, 96 insertions, 35 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 88b8ec6..577229a 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -338,17 +338,16 @@ class KafkaClient(object):
# select on reads across all connected sockets, blocking up to timeout
sockets = dict([(conn._sock, conn)
for conn in six.itervalues(self._conns)
- if (conn.state is ConnectionStates.CONNECTED
- and conn.in_flight_requests)])
+ if conn.state is ConnectionStates.CONNECTED
+ and conn.in_flight_requests])
if not sockets:
# if sockets are connecting, we can wake when they are writeable
if self._connecting:
sockets = [self._conns[node]._sock for node in self._connecting]
select.select([], sockets, [], timeout)
- # otherwise just sleep to prevent CPU spinning
- else:
- log.debug('Nothing to do in _poll -- sleeping for %s', timeout)
- time.sleep(timeout)
+ elif timeout:
+ log.warning('_poll called with a timeout, but nothing to do'
+ ' -- this can cause high CPU usage during idle')
return []
ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index eb85060..f116bed 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -36,6 +36,7 @@ class Fetcher(six.Iterator):
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
+ 'iterator_refetch_records': 1, # undocumented -- interface may change
}
def __init__(self, client, subscriptions, **configs):
@@ -80,15 +81,28 @@ class Fetcher(six.Iterator):
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
+ self._fetch_futures = collections.deque()
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
+ Note: noop if there are unconsumed records internal to the fetcher
+
Returns:
List of Futures: each future resolves to a FetchResponse
"""
+ # We need to be careful when creating fetch records during iteration
+ # so we verify that there are no records in the deque, or in an
+ # iterator
+ if self._records or self._iterator:
+ log.debug('Skipping init_fetches because there are unconsumed'
+ ' records internally')
+ return []
+ return self._init_fetches()
+
+ def _init_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -97,8 +111,23 @@ class Fetcher(six.Iterator):
future.add_callback(self._handle_fetch_response, request)
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
+ self._fetch_futures.extend(futures)
+ self._clean_done_fetch_futures()
return futures
+ def _clean_done_fetch_futures(self):
+ while True:
+ if not self._fetch_futures:
+ break
+ if not self._fetch_futures[0].is_done:
+ break
+ self._fetch_futures.popleft()
+
+ def in_flight_fetches(self):
+ """Return True if there are any unprocessed FetchRequests in flight."""
+ self._clean_done_fetch_futures()
+ return bool(self._fetch_futures)
+
def update_fetch_positions(self, partitions):
"""Update the fetch positions for the provided partitions.
@@ -301,9 +330,9 @@ class Fetcher(six.Iterator):
elif fetch_offset == position:
next_offset = messages[-1][0] + 1
- log.debug("Returning fetched records at offset %d for assigned"
- " partition %s and update position to %s", position,
- tp, next_offset)
+ log.log(0, "Returning fetched records at offset %d for assigned"
+ " partition %s and update position to %s", position,
+ tp, next_offset)
self._subscriptions.assignment[tp].position = next_offset
for record in self._unpack_message_set(tp, messages):
@@ -339,13 +368,18 @@ class Fetcher(six.Iterator):
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
+ # Send additional FetchRequests when the internal queue is low
+ # this should enable moderate pipelining
+ if len(self._records) <= self.config['iterator_refetch_records']:
+ self._init_fetches()
+
(fetch_offset, tp, messages) = self._records.popleft()
if not self._subscriptions.is_assigned(tp):
# this can happen when a rebalance happened before
# fetched records are returned
- log.warning("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
continue
# note that the consumed position should always be available
@@ -354,23 +388,29 @@ class Fetcher(six.Iterator):
if not self._subscriptions.is_fetchable(tp):
# this can happen when a partition consumption paused before
# fetched records are returned
- log.warning("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
+ log.debug("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
elif fetch_offset == position:
+ log.log(0, "Returning fetched records at offset %d for assigned"
+ " partition %s", position, tp)
for msg in self._unpack_message_set(tp, messages):
+
+ # Because we are in a generator, it is possible for
+ # assignment to change between yield calls
+ # so we need to re-check on each loop
+ if not self._subscriptions.is_assigned(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ break
+
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
- log.warning("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
-
- # Send any additional FetchRequests that we can now
- # this will likely fetch each partition individually, rather than
- # fetch multiple partitions in bulk when they are on the same broker
- self.init_fetches()
+ log.debug("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -469,8 +509,7 @@ class Fetcher(six.Iterator):
def _create_fetch_requests(self):
"""Create fetch requests for all assigned partitions, grouped by node.
- FetchRequests skipped if no leader, node has requests in flight, or we
- have not returned all previously fetched records to consumer
+ FetchRequests skipped if no leader, or node has requests in flight
Returns:
dict: {node_id: [FetchRequest,...]}
@@ -486,9 +525,7 @@ class Fetcher(six.Iterator):
" Requesting metadata update", partition)
self._client.cluster.request_update()
elif self._client.in_flight_request_count(node_id) == 0:
- # if there is a leader and no in-flight requests,
- # issue a new fetch but only fetch data for partitions whose
- # previously fetched data has been consumed
+ # fetch if there is a leader and no in-flight requests
position = self._subscriptions.assignment[partition].position
partition_info = (
partition.partition,
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 704c994..333ef64 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator):
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
+ assert self.assignment() or self.subscription() is not None
while time.time() < self._consumer_timeout:
if self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
@@ -626,21 +627,43 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
- # init any new fetches (won't resend pending fetches)
- self._fetcher.init_fetches()
- self._client.poll(
- max(0, self._consumer_timeout - time.time()) * 1000)
-
+ # We need to make sure we at least keep up with scheduled tasks,
+ # like heartbeats, auto-commits, and metadata refreshes
timeout_at = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
+
+ if self.config['api_version'] >= (0, 9):
+ if not self.assignment():
+ sleep_time = time.time() - timeout_at
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ poll_ms = 1000 * (time.time() - self._consumer_timeout)
+
+ # Dont bother blocking if there are no fetches
+ if not self._fetcher.in_flight_fetches():
+ poll_ms = 0
+
+ self._client.poll(poll_ms)
+
if time.time() > timeout_at:
continue
+
for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
+ log.debug("internal iterator timeout - breaking for poll")
break
+ # an else block on a for loop only executes if there was no break
+ # so this should only be called on a StopIteration from the fetcher
+ # and we assume that it is safe to init_fetches when fetcher is done
+ # i.e., there are no more records stored internally
+ else:
+ self._fetcher.init_fetches()
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -648,17 +671,19 @@ class KafkaConsumer(six.Iterator):
if not self._iterator:
self._iterator = self._message_generator()
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
-
+ self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
+ def _set_consumer_timeout(self):
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
+
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
raise NotImplementedError(