summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-09-28 12:39:34 -0700
committerGitHub <noreply@github.com>2016-09-28 12:39:34 -0700
commit9ee77dfdbc4aeb5723ce7ebdae76f8b7141962af (patch)
treeae679983f7206ff6d0058fa551aa4f8380612e42
parentb8717b4b79462e83344f49bbd42312cf521d84aa (diff)
downloadkafka-python-9ee77dfdbc4aeb5723ce7ebdae76f8b7141962af.tar.gz
KAFKA-3007: KafkaConsumer max_poll_records (#831)
-rw-r--r--kafka/consumer/fetcher.py226
-rw-r--r--kafka/consumer/group.py134
-rw-r--r--test/test_consumer_group.py8
-rw-r--r--test/test_consumer_integration.py1
-rw-r--r--test/test_fetcher.py16
5 files changed, 123 insertions, 262 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index f5d44b1..15fa1c9 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -4,6 +4,7 @@ import collections
import copy
import logging
import random
+import sys
import time
from kafka.vendor import six
@@ -39,6 +40,7 @@ class Fetcher(six.Iterator):
'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
+ 'max_poll_records': sys.maxsize,
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
@@ -92,11 +94,10 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
- self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
- def init_fetches(self):
+ def send_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
Note: noop if there are unconsumed records internal to the fetcher
@@ -104,16 +105,6 @@ class Fetcher(six.Iterator):
Returns:
List of Futures: each future resolves to a FetchResponse
"""
- # We need to be careful when creating fetch records during iteration
- # so we verify that there are no records in the deque, or in an
- # iterator
- if self._records or self._iterator:
- log.debug('Skipping init_fetches because there are unconsumed'
- ' records internally')
- return []
- return self._init_fetches()
-
- def _init_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -291,10 +282,12 @@ class Fetcher(six.Iterator):
copied_record_too_large_partitions,
self.config['max_partition_fetch_bytes'])
- def fetched_records(self):
+ def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
- Incompatible with iterator interface - use one or the other, not both.
+ Arguments:
+ max_records (int): Maximum number of records returned. Defaults
+ to max_poll_records configuration.
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
@@ -304,32 +297,44 @@ class Fetcher(six.Iterator):
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
- AssertionError: if used with iterator (incompatible)
- Returns:
- dict: {TopicPartition: [messages]}
+ Returns: (records (dict), partial (bool))
+ records: {TopicPartition: [messages]}
+ partial: True if records returned did not fully drain any pending
+ partition requests. This may be useful for choosing when to
+ pipeline additional fetch requests.
"""
- assert self._iterator is None, (
- 'fetched_records is incompatible with message iterator')
+ if max_records is None:
+ max_records = self.config['max_poll_records']
+ assert max_records > 0
+
if self._subscriptions.needs_partition_assignment:
- return {}
+ return {}, False
- drained = collections.defaultdict(list)
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
- # Loop over the records deque
- while self._records:
- (fetch_offset, tp, messages) = self._records.popleft()
-
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
-
+ drained = collections.defaultdict(list)
+ partial = bool(self._records and max_records)
+ while self._records and max_records > 0:
+ part = self._records.popleft()
+ max_records -= self._append(drained, part, max_records)
+ if part.has_more():
+ self._records.appendleft(part)
+ else:
+ partial &= False
+ return dict(drained), partial
+
+ def _append(self, drained, part, max_records):
+ tp = part.topic_partition
+ fetch_offset = part.fetch_offset
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ else:
# note that the position should always be available
# as long as the partition is still assigned
position = self._subscriptions.assignment[tp].position
@@ -340,26 +345,35 @@ class Fetcher(six.Iterator):
" %s since it is no longer fetchable", tp)
elif fetch_offset == position:
- next_offset = messages[-1][0] + 1
+ part_records = part.take(max_records)
+ if not part_records:
+ return 0
+ next_offset = part_records[-1].offset + 1
+
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s and update position to %s", position,
tp, next_offset)
- self._subscriptions.assignment[tp].position = next_offset
- for record in self._unpack_message_set(tp, messages):
+ for record in part_records:
# Fetched compressed messages may include additional records
if record.offset < fetch_offset:
log.debug("Skipping message offset: %s (expecting %s)",
record.offset, fetch_offset)
continue
drained[tp].append(record)
+
+ self._subscriptions.assignment[tp].position = next_offset
+ return len(part_records)
+
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.debug("Ignoring fetched records for %s at offset %s since"
- " the current position is %d", tp, fetch_offset,
+ " the current position is %d", tp, part.fetch_offset,
position)
- return dict(drained)
+
+ part.discard()
+ return 0
def _unpack_message_set(self, tp, messages):
try:
@@ -430,97 +444,17 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
- def _message_generator(self):
- """Iterate over fetched_records"""
- if self._subscriptions.needs_partition_assignment:
- raise StopIteration('Subscription needs partition assignment')
-
- while self._records:
-
- # Check on each iteration since this is a generator
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
-
- # Send additional FetchRequests when the internal queue is low
- # this should enable moderate pipelining
- if len(self._records) <= self.config['iterator_refetch_records']:
- self._init_fetches()
-
- (fetch_offset, tp, messages) = self._records.popleft()
-
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
-
- # note that the consumed position should always be available
- # as long as the partition is still assigned
- position = self._subscriptions.assignment[tp].position
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition consumption paused before
- # fetched records are returned
- log.debug("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
-
- elif fetch_offset == position:
- log.log(0, "Returning fetched records at offset %d for assigned"
- " partition %s", position, tp)
-
- # We can ignore any prior signal to drop pending message sets
- # because we are starting from a fresh one where fetch_offset == position
- # i.e., the user seek()'d to this position
- self._subscriptions.assignment[tp].drop_pending_message_set = False
-
- for msg in self._unpack_message_set(tp, messages):
-
- # Because we are in a generator, it is possible for
- # subscription state to change between yield calls
- # so we need to re-check on each loop
- # this should catch assignment changes, pauses
- # and resets via seek_to_beginning / seek_to_end
- if not self._subscriptions.is_fetchable(tp):
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer fetchable", tp)
- break
-
- # If there is a seek during message iteration,
- # we should stop unpacking this message set and
- # wait for a new fetch response that aligns with the
- # new seek position
- elif self._subscriptions.assignment[tp].drop_pending_message_set:
- log.debug("Skipping remainder of message set for partition %s", tp)
- self._subscriptions.assignment[tp].drop_pending_message_set = False
- break
-
- # Compressed messagesets may include earlier messages
- elif msg.offset < self._subscriptions.assignment[tp].position:
- log.debug("Skipping message offset: %s (expecting %s)",
- msg.offset,
- self._subscriptions.assignment[tp].position)
- continue
-
- self._subscriptions.assignment[tp].position = msg.offset + 1
- yield msg
- else:
- # these records aren't next in line based on the last consumed
- # position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
+ ret, _ = self.fetched_records(max_records=1)
+ if not ret:
+ raise StopIteration
+ assert len(ret) == 1
+ (messages,) = ret.values()
+ assert len(messages) == 1
+ return messages[0]
def _deserialize(self, msg):
if self.config['key_deserializer']:
@@ -601,6 +535,11 @@ class Fetcher(six.Iterator):
" %s", partition, error_type)
future.failure(error_type(partition))
+ def _fetchable_partitions(self):
+ fetchable = self._subscriptions.fetchable_partitions()
+ pending = set([part.topic_partition for part in self._records])
+ return fetchable.difference(pending)
+
def _create_fetch_requests(self):
"""Create fetch requests for all assigned partitions, grouped by node.
@@ -613,24 +552,17 @@ class Fetcher(six.Iterator):
# which can be passed to FetchRequest() via .items()
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
- # avoid re-fetching pending offsets
- pending = set()
- for fetch_offset, tp, _ in self._records:
- pending.add((tp, fetch_offset))
-
- for partition in self._subscriptions.fetchable_partitions():
+ for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
position = self._subscriptions.assignment[partition].position
- # fetch if there is a leader, no in-flight requests, and no _records
+ # fetch if there is a leader and no in-flight requests
if node_id is None or node_id == -1:
log.debug("No leader found for partition %s."
" Requesting metadata update", partition)
self._client.cluster.request_update()
- elif ((partition, position) not in pending and
- self._client.in_flight_request_count(node_id) == 0):
-
+ elif self._client.in_flight_request_count(node_id) == 0:
partition_info = (
partition.partition,
position,
@@ -704,7 +636,8 @@ class Fetcher(six.Iterator):
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
- self._records.append((fetch_offset, tp, messages))
+ unpacked = list(self._unpack_message_set(tp, messages))
+ self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
last_offset, _, _ = messages[-1]
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
@@ -744,6 +677,29 @@ class Fetcher(six.Iterator):
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
+ class PartitionRecords(six.Iterator):
+ def __init__(self, fetch_offset, tp, messages):
+ self.fetch_offset = fetch_offset
+ self.topic_partition = tp
+ self.messages = messages
+ self.message_idx = 0
+
+ def discard(self):
+ self.messages = None
+
+ def take(self, n):
+ if not self.has_more():
+ return []
+ next_idx = self.message_idx + n
+ res = self.messages[self.message_idx:next_idx]
+ self.message_idx = next_idx
+ if self.has_more():
+ self.fetch_offset = self.messages[self.message_idx].offset
+ return res
+
+ def has_more(self):
+ return self.message_idx < len(self.messages)
+
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d4e0ff3..efadde1 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
import copy
import logging
import socket
+import sys
import time
from kafka.vendor import six
@@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator):
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
+ max_poll_records (int): ....
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
@@ -126,7 +128,7 @@ class KafkaConsumer(six.Iterator):
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
- iterator). Default -1 (block forever).
+ iterator). Default block forever [float('inf')].
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
caused some messages to be corrupted via double-compression.
By default, the fetcher will return these messages as a compressed
@@ -220,10 +222,11 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
+ 'max_poll_records': sys.maxsize,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
- 'consumer_timeout_ms': -1,
+ 'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
@@ -295,8 +298,6 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
- self._iterator = None
- self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -483,7 +484,7 @@ class KafkaConsumer(six.Iterator):
"""
return self._client.cluster.partitions_for_topic(topic)
- def poll(self, timeout_ms=0):
+ def poll(self, timeout_ms=0, max_records=None):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
@@ -505,19 +506,15 @@ class KafkaConsumer(six.Iterator):
subscribed list of topics and partitions
"""
assert timeout_ms >= 0, 'Timeout must not be negative'
- assert self._iterator is None, 'Incompatible with iterator interface'
+ if max_records is None:
+ max_records = self.config['max_poll_records']
# poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
while True:
- records = self._poll_once(remaining)
+ records = self._poll_once(remaining, max_records)
if records:
- # before returning the fetched records, we can send off the
- # next round of fetches and avoid block waiting for their
- # responses to enable pipelining while the user is handling the
- # fetched records.
- self._fetcher.init_fetches()
return records
elapsed_ms = (time.time() - start) * 1000
@@ -526,7 +523,7 @@ class KafkaConsumer(six.Iterator):
if remaining <= 0:
return {}
- def _poll_once(self, timeout_ms):
+ def _poll_once(self, timeout_ms, max_records):
"""
Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
@@ -545,23 +542,29 @@ class KafkaConsumer(six.Iterator):
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
-
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
- # init any new fetches (won't resend pending fetches)
- records = self._fetcher.fetched_records()
-
# if data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
+ records, partial = self._fetcher.fetched_records(max_records)
if records:
+ # before returning the fetched records, we can send off the
+ # next round of fetches and avoid block waiting for their
+ # responses to enable pipelining while the user is handling the
+ # fetched records.
+ if not partial:
+ self._fetcher.send_fetches()
return records
- self._fetcher.init_fetches()
+ # send any new fetches (won't resend pending fetches)
+ self._fetcher.send_fetches()
+
self._client.poll(timeout_ms=timeout_ms, sleep=True)
- return self._fetcher.fetched_records()
+ records, _ = self._fetcher.fetched_records(max_records)
+ return records
def position(self, partition):
"""Get the offset of the next record that will be fetched
@@ -832,96 +835,17 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
- def _message_generator(self):
- assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
- while time.time() < self._consumer_timeout:
-
- if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
- self._coordinator.ensure_active_group()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
-
- # fetch offsets for any subscribed partitions that we arent tracking yet
- if not self._subscription.has_all_fetch_positions():
- partitions = self._subscription.missing_fetch_positions()
- self._update_fetch_positions(partitions)
-
- poll_ms = 1000 * (self._consumer_timeout - time.time())
- if not self._fetcher.in_flight_fetches():
- poll_ms = 0
- self._client.poll(timeout_ms=poll_ms, sleep=True)
-
- # We need to make sure we at least keep up with scheduled tasks,
- # like heartbeats, auto-commits, and metadata refreshes
- timeout_at = self._next_timeout()
-
- # Because the consumer client poll does not sleep unless blocking on
- # network IO, we need to explicitly sleep when we know we are idle
- # because we haven't been assigned any partitions to fetch / consume
- if self._use_consumer_group() and not self.assignment():
- sleep_time = max(timeout_at - time.time(), 0)
- if sleep_time > 0 and not self._client.in_flight_request_count():
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
- # Short-circuit the fetch iterator if we are already timed out
- # to avoid any unintentional interaction with fetcher setup
- if time.time() > timeout_at:
- continue
-
- for msg in self._fetcher:
- yield msg
- if time.time() > timeout_at:
- log.debug("internal iterator timeout - breaking for poll")
- break
-
- # an else block on a for loop only executes if there was no break
- # so this should only be called on a StopIteration from the fetcher
- # and we assume that it is safe to init_fetches when fetcher is done
- # i.e., there are no more records stored internally
- else:
- self._fetcher.init_fetches()
-
- def _next_timeout(self):
- timeout = min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
-
- # Although the delayed_tasks timeout above should cover processing
- # HeartbeatRequests, it is still possible that HeartbeatResponses
- # are left unprocessed during a long _fetcher iteration without
- # an intermediate poll(). And because tasks are responsible for
- # rescheduling themselves, an unprocessed response will prevent
- # the next heartbeat from being sent. This check should help
- # avoid that.
- if self._use_consumer_group():
- heartbeat = time.time() + self._coordinator.heartbeat.ttl()
- timeout = min(timeout, heartbeat)
- return timeout
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
-
- self._set_consumer_timeout()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
-
- def _set_consumer_timeout(self):
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
+ ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
+ if not ret:
+ raise StopIteration
+ assert len(ret) == 1
+ (messages,) = ret.values()
+ assert len(messages) == 1
+ return messages[0]
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 1acde5e..9d9be60 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -141,11 +141,3 @@ def test_paused(kafka_broker, topic):
consumer.unsubscribe()
assert set() == consumer.paused()
-
-
-def test_heartbeat_timeout(conn, mocker):
- mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9))
- mocker.patch('time.time', return_value = 1234)
- consumer = KafkaConsumer('foobar')
- mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
- assert consumer._next_timeout() == 1234
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 9c27eee..998045f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -500,6 +500,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
+ enable_auto_commit=False,
consumer_timeout_ms=TIMEOUT_MS)
# Manual assignment avoids overhead of consumer group mgmt
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 6afd547..fea3f7d 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -33,7 +33,7 @@ def fetcher(client, subscription_state):
return Fetcher(client, subscription_state, Metrics())
-def test_init_fetches(fetcher, mocker):
+def test_send_fetches(fetcher, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
@@ -53,19 +53,7 @@ def test_init_fetches(fetcher, mocker):
mocker.patch.object(fetcher, '_create_fetch_requests',
return_value = dict(enumerate(fetch_requests)))
- fetcher._records.append('foobar')
- ret = fetcher.init_fetches()
- assert fetcher._create_fetch_requests.call_count == 0
- assert ret == []
- fetcher._records.clear()
-
- fetcher._iterator = 'foo'
- ret = fetcher.init_fetches()
- assert fetcher._create_fetch_requests.call_count == 0
- assert ret == []
- fetcher._iterator = None
-
- ret = fetcher.init_fetches()
+ ret = fetcher.send_fetches()
for node, request in enumerate(fetch_requests):
fetcher._client.send.assert_any_call(node, request)
assert len(ret) == len(fetch_requests)