diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-09-28 12:39:34 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-28 12:39:34 -0700 |
commit | 9ee77dfdbc4aeb5723ce7ebdae76f8b7141962af (patch) | |
tree | ae679983f7206ff6d0058fa551aa4f8380612e42 | |
parent | b8717b4b79462e83344f49bbd42312cf521d84aa (diff) | |
download | kafka-python-9ee77dfdbc4aeb5723ce7ebdae76f8b7141962af.tar.gz |
KAFKA-3007: KafkaConsumer max_poll_records (#831)
-rw-r--r-- | kafka/consumer/fetcher.py | 226 | ||||
-rw-r--r-- | kafka/consumer/group.py | 134 | ||||
-rw-r--r-- | test/test_consumer_group.py | 8 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 1 | ||||
-rw-r--r-- | test/test_fetcher.py | 16 |
5 files changed, 123 insertions, 262 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f5d44b1..15fa1c9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -4,6 +4,7 @@ import collections import copy import logging import random +import sys import time from kafka.vendor import six @@ -39,6 +40,7 @@ class Fetcher(six.Iterator): 'fetch_min_bytes': 1, 'fetch_max_wait_ms': 500, 'max_partition_fetch_bytes': 1048576, + 'max_poll_records': sys.maxsize, 'check_crcs': True, 'skip_double_compressed_messages': False, 'iterator_refetch_records': 1, # undocumented -- interface may change @@ -92,11 +94,10 @@ class Fetcher(six.Iterator): self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} - self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) - def init_fetches(self): + def send_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. Note: noop if there are unconsumed records internal to the fetcher @@ -104,16 +105,6 @@ class Fetcher(six.Iterator): Returns: List of Futures: each future resolves to a FetchResponse """ - # We need to be careful when creating fetch records during iteration - # so we verify that there are no records in the deque, or in an - # iterator - if self._records or self._iterator: - log.debug('Skipping init_fetches because there are unconsumed' - ' records internally') - return [] - return self._init_fetches() - - def _init_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -291,10 +282,12 @@ class Fetcher(six.Iterator): copied_record_too_large_partitions, self.config['max_partition_fetch_bytes']) - def fetched_records(self): + def fetched_records(self, max_records=None): """Returns previously fetched records and updates consumed offsets. - Incompatible with iterator interface - use one or the other, not both. + Arguments: + max_records (int): Maximum number of records returned. Defaults + to max_poll_records configuration. Raises: OffsetOutOfRangeError: if no subscription offset_reset_strategy @@ -304,32 +297,44 @@ class Fetcher(six.Iterator): configured max_partition_fetch_bytes TopicAuthorizationError: if consumer is not authorized to fetch messages from the topic - AssertionError: if used with iterator (incompatible) - Returns: - dict: {TopicPartition: [messages]} + Returns: (records (dict), partial (bool)) + records: {TopicPartition: [messages]} + partial: True if records returned did not fully drain any pending + partition requests. This may be useful for choosing when to + pipeline additional fetch requests. """ - assert self._iterator is None, ( - 'fetched_records is incompatible with message iterator') + if max_records is None: + max_records = self.config['max_poll_records'] + assert max_records > 0 + if self._subscriptions.needs_partition_assignment: - return {} + return {}, False - drained = collections.defaultdict(list) self._raise_if_offset_out_of_range() self._raise_if_unauthorized_topics() self._raise_if_record_too_large() - # Loop over the records deque - while self._records: - (fetch_offset, tp, messages) = self._records.popleft() - - if not self._subscriptions.is_assigned(tp): - # this can happen when a rebalance happened before - # fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) - continue - + drained = collections.defaultdict(list) + partial = bool(self._records and max_records) + while self._records and max_records > 0: + part = self._records.popleft() + max_records -= self._append(drained, part, max_records) + if part.has_more(): + self._records.appendleft(part) + else: + partial &= False + return dict(drained), partial + + def _append(self, drained, part, max_records): + tp = part.topic_partition + fetch_offset = part.fetch_offset + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + else: # note that the position should always be available # as long as the partition is still assigned position = self._subscriptions.assignment[tp].position @@ -340,26 +345,35 @@ class Fetcher(six.Iterator): " %s since it is no longer fetchable", tp) elif fetch_offset == position: - next_offset = messages[-1][0] + 1 + part_records = part.take(max_records) + if not part_records: + return 0 + next_offset = part_records[-1].offset + 1 + log.log(0, "Returning fetched records at offset %d for assigned" " partition %s and update position to %s", position, tp, next_offset) - self._subscriptions.assignment[tp].position = next_offset - for record in self._unpack_message_set(tp, messages): + for record in part_records: # Fetched compressed messages may include additional records if record.offset < fetch_offset: log.debug("Skipping message offset: %s (expecting %s)", record.offset, fetch_offset) continue drained[tp].append(record) + + self._subscriptions.assignment[tp].position = next_offset + return len(part_records) + else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request log.debug("Ignoring fetched records for %s at offset %s since" - " the current position is %d", tp, fetch_offset, + " the current position is %d", tp, part.fetch_offset, position) - return dict(drained) + + part.discard() + return 0 def _unpack_message_set(self, tp, messages): try: @@ -430,97 +444,17 @@ class Fetcher(six.Iterator): log.exception('StopIteration raised unpacking messageset: %s', e) raise Exception('StopIteration raised unpacking messageset') - def _message_generator(self): - """Iterate over fetched_records""" - if self._subscriptions.needs_partition_assignment: - raise StopIteration('Subscription needs partition assignment') - - while self._records: - - # Check on each iteration since this is a generator - self._raise_if_offset_out_of_range() - self._raise_if_unauthorized_topics() - self._raise_if_record_too_large() - - # Send additional FetchRequests when the internal queue is low - # this should enable moderate pipelining - if len(self._records) <= self.config['iterator_refetch_records']: - self._init_fetches() - - (fetch_offset, tp, messages) = self._records.popleft() - - if not self._subscriptions.is_assigned(tp): - # this can happen when a rebalance happened before - # fetched records are returned - log.debug("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) - continue - - # note that the consumed position should always be available - # as long as the partition is still assigned - position = self._subscriptions.assignment[tp].position - if not self._subscriptions.is_fetchable(tp): - # this can happen when a partition consumption paused before - # fetched records are returned - log.debug("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) - - elif fetch_offset == position: - log.log(0, "Returning fetched records at offset %d for assigned" - " partition %s", position, tp) - - # We can ignore any prior signal to drop pending message sets - # because we are starting from a fresh one where fetch_offset == position - # i.e., the user seek()'d to this position - self._subscriptions.assignment[tp].drop_pending_message_set = False - - for msg in self._unpack_message_set(tp, messages): - - # Because we are in a generator, it is possible for - # subscription state to change between yield calls - # so we need to re-check on each loop - # this should catch assignment changes, pauses - # and resets via seek_to_beginning / seek_to_end - if not self._subscriptions.is_fetchable(tp): - log.debug("Not returning fetched records for partition %s" - " since it is no longer fetchable", tp) - break - - # If there is a seek during message iteration, - # we should stop unpacking this message set and - # wait for a new fetch response that aligns with the - # new seek position - elif self._subscriptions.assignment[tp].drop_pending_message_set: - log.debug("Skipping remainder of message set for partition %s", tp) - self._subscriptions.assignment[tp].drop_pending_message_set = False - break - - # Compressed messagesets may include earlier messages - elif msg.offset < self._subscriptions.assignment[tp].position: - log.debug("Skipping message offset: %s (expecting %s)", - msg.offset, - self._subscriptions.assignment[tp].position) - continue - - self._subscriptions.assignment[tp].position = msg.offset + 1 - yield msg - else: - # these records aren't next in line based on the last consumed - # position, ignore them they must be from an obsolete request - log.debug("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) - def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - if not self._iterator: - self._iterator = self._message_generator() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise + ret, _ = self.fetched_records(max_records=1) + if not ret: + raise StopIteration + assert len(ret) == 1 + (messages,) = ret.values() + assert len(messages) == 1 + return messages[0] def _deserialize(self, msg): if self.config['key_deserializer']: @@ -601,6 +535,11 @@ class Fetcher(six.Iterator): " %s", partition, error_type) future.failure(error_type(partition)) + def _fetchable_partitions(self): + fetchable = self._subscriptions.fetchable_partitions() + pending = set([part.topic_partition for part in self._records]) + return fetchable.difference(pending) + def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. @@ -613,24 +552,17 @@ class Fetcher(six.Iterator): # which can be passed to FetchRequest() via .items() fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) - # avoid re-fetching pending offsets - pending = set() - for fetch_offset, tp, _ in self._records: - pending.add((tp, fetch_offset)) - - for partition in self._subscriptions.fetchable_partitions(): + for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) position = self._subscriptions.assignment[partition].position - # fetch if there is a leader, no in-flight requests, and no _records + # fetch if there is a leader and no in-flight requests if node_id is None or node_id == -1: log.debug("No leader found for partition %s." " Requesting metadata update", partition) self._client.cluster.request_update() - elif ((partition, position) not in pending and - self._client.in_flight_request_count(node_id) == 0): - + elif self._client.in_flight_request_count(node_id) == 0: partition_info = ( partition.partition, position, @@ -704,7 +636,8 @@ class Fetcher(six.Iterator): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position) - self._records.append((fetch_offset, tp, messages)) + unpacked = list(self._unpack_message_set(tp, messages)) + self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked)) last_offset, _, _ = messages[-1] self._sensors.records_fetch_lag.record(highwater - last_offset) num_bytes = sum(msg[1] for msg in messages) @@ -744,6 +677,29 @@ class Fetcher(six.Iterator): self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) self._sensors.fetch_latency.record((recv_time - send_time) * 1000) + class PartitionRecords(six.Iterator): + def __init__(self, fetch_offset, tp, messages): + self.fetch_offset = fetch_offset + self.topic_partition = tp + self.messages = messages + self.message_idx = 0 + + def discard(self): + self.messages = None + + def take(self, n): + if not self.has_more(): + return [] + next_idx = self.message_idx + n + res = self.messages[self.message_idx:next_idx] + self.message_idx = next_idx + if self.has_more(): + self.fetch_offset = self.messages[self.message_idx].offset + return res + + def has_more(self): + return self.message_idx < len(self.messages) + class FetchManagerMetrics(object): def __init__(self, metrics, prefix): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d4e0ff3..efadde1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import copy import logging import socket +import sys import time from kafka.vendor import six @@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator): rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when using Kafka's group managementment facilities. Default: 30000 + max_poll_records (int): .... receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -126,7 +128,7 @@ class KafkaConsumer(six.Iterator): [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] consumer_timeout_ms (int): number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the - iterator). Default -1 (block forever). + iterator). Default block forever [float('inf')]. skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4 caused some messages to be corrupted via double-compression. By default, the fetcher will return these messages as a compressed @@ -220,10 +222,11 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, + 'max_poll_records': sys.maxsize, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'consumer_timeout_ms': -1, + 'consumer_timeout_ms': float('inf'), 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, @@ -295,8 +298,6 @@ class KafkaConsumer(six.Iterator): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False - self._iterator = None - self._consumer_timeout = float('inf') if topics: self._subscription.subscribe(topics=topics) @@ -483,7 +484,7 @@ class KafkaConsumer(six.Iterator): """ return self._client.cluster.partitions_for_topic(topic) - def poll(self, timeout_ms=0): + def poll(self, timeout_ms=0, max_records=None): """Fetch data from assigned topics / partitions. Records are fetched and returned in batches by topic-partition. @@ -505,19 +506,15 @@ class KafkaConsumer(six.Iterator): subscribed list of topics and partitions """ assert timeout_ms >= 0, 'Timeout must not be negative' - assert self._iterator is None, 'Incompatible with iterator interface' + if max_records is None: + max_records = self.config['max_poll_records'] # poll for new data until the timeout expires start = time.time() remaining = timeout_ms while True: - records = self._poll_once(remaining) + records = self._poll_once(remaining, max_records) if records: - # before returning the fetched records, we can send off the - # next round of fetches and avoid block waiting for their - # responses to enable pipelining while the user is handling the - # fetched records. - self._fetcher.init_fetches() return records elapsed_ms = (time.time() - start) * 1000 @@ -526,7 +523,7 @@ class KafkaConsumer(six.Iterator): if remaining <= 0: return {} - def _poll_once(self, timeout_ms): + def _poll_once(self, timeout_ms, max_records): """ Do one round of polling. In addition to checking for new data, this does any needed heart-beating, auto-commits, and offset updates. @@ -545,23 +542,29 @@ class KafkaConsumer(six.Iterator): elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() - # fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): self._update_fetch_positions(self._subscription.missing_fetch_positions()) - # init any new fetches (won't resend pending fetches) - records = self._fetcher.fetched_records() - # if data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately + records, partial = self._fetcher.fetched_records(max_records) if records: + # before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + if not partial: + self._fetcher.send_fetches() return records - self._fetcher.init_fetches() + # send any new fetches (won't resend pending fetches) + self._fetcher.send_fetches() + self._client.poll(timeout_ms=timeout_ms, sleep=True) - return self._fetcher.fetched_records() + records, _ = self._fetcher.fetched_records(max_records) + return records def position(self, partition): """Get the offset of the next record that will be fetched @@ -832,96 +835,17 @@ class KafkaConsumer(six.Iterator): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) - def _message_generator(self): - assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' - while time.time() < self._consumer_timeout: - - if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() - self._coordinator.ensure_active_group() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() - - # fetch offsets for any subscribed partitions that we arent tracking yet - if not self._subscription.has_all_fetch_positions(): - partitions = self._subscription.missing_fetch_positions() - self._update_fetch_positions(partitions) - - poll_ms = 1000 * (self._consumer_timeout - time.time()) - if not self._fetcher.in_flight_fetches(): - poll_ms = 0 - self._client.poll(timeout_ms=poll_ms, sleep=True) - - # We need to make sure we at least keep up with scheduled tasks, - # like heartbeats, auto-commits, and metadata refreshes - timeout_at = self._next_timeout() - - # Because the consumer client poll does not sleep unless blocking on - # network IO, we need to explicitly sleep when we know we are idle - # because we haven't been assigned any partitions to fetch / consume - if self._use_consumer_group() and not self.assignment(): - sleep_time = max(timeout_at - time.time(), 0) - if sleep_time > 0 and not self._client.in_flight_request_count(): - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - - # Short-circuit the fetch iterator if we are already timed out - # to avoid any unintentional interaction with fetcher setup - if time.time() > timeout_at: - continue - - for msg in self._fetcher: - yield msg - if time.time() > timeout_at: - log.debug("internal iterator timeout - breaking for poll") - break - - # an else block on a for loop only executes if there was no break - # so this should only be called on a StopIteration from the fetcher - # and we assume that it is safe to init_fetches when fetcher is done - # i.e., there are no more records stored internally - else: - self._fetcher.init_fetches() - - def _next_timeout(self): - timeout = min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) - - # Although the delayed_tasks timeout above should cover processing - # HeartbeatRequests, it is still possible that HeartbeatResponses - # are left unprocessed during a long _fetcher iteration without - # an intermediate poll(). And because tasks are responsible for - # rescheduling themselves, an unprocessed response will prevent - # the next heartbeat from being sent. This check should help - # avoid that. - if self._use_consumer_group(): - heartbeat = time.time() + self._coordinator.heartbeat.ttl() - timeout = min(timeout, heartbeat) - return timeout - def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - if not self._iterator: - self._iterator = self._message_generator() - - self._set_consumer_timeout() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise - - def _set_consumer_timeout(self): - # consumer_timeout_ms can be used to stop iteration early - if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( - self.config['consumer_timeout_ms'] / 1000.0) + ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1) + if not ret: + raise StopIteration + assert len(ret) == 1 + (messages,) = ret.values() + assert len(messages) == 1 + return messages[0] # old KafkaConsumer methods are deprecated def configure(self, **configs): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 1acde5e..9d9be60 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -141,11 +141,3 @@ def test_paused(kafka_broker, topic): consumer.unsubscribe() assert set() == consumer.paused() - - -def test_heartbeat_timeout(conn, mocker): - mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9)) - mocker.patch('time.time', return_value = 1234) - consumer = KafkaConsumer('foobar') - mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0) - assert consumer._next_timeout() == 1234 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9c27eee..998045f 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -500,6 +500,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', + enable_auto_commit=False, consumer_timeout_ms=TIMEOUT_MS) # Manual assignment avoids overhead of consumer group mgmt diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 6afd547..fea3f7d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -33,7 +33,7 @@ def fetcher(client, subscription_state): return Fetcher(client, subscription_state, Metrics()) -def test_init_fetches(fetcher, mocker): +def test_send_fetches(fetcher, mocker): fetch_requests = [ FetchRequest[0]( -1, fetcher.config['fetch_max_wait_ms'], @@ -53,19 +53,7 @@ def test_init_fetches(fetcher, mocker): mocker.patch.object(fetcher, '_create_fetch_requests', return_value = dict(enumerate(fetch_requests))) - fetcher._records.append('foobar') - ret = fetcher.init_fetches() - assert fetcher._create_fetch_requests.call_count == 0 - assert ret == [] - fetcher._records.clear() - - fetcher._iterator = 'foo' - ret = fetcher.init_fetches() - assert fetcher._create_fetch_requests.call_count == 0 - assert ret == [] - fetcher._iterator = None - - ret = fetcher.init_fetches() + ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): fetcher._client.send.assert_any_call(node, request) assert len(ret) == len(fetch_requests) |