diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-09-16 09:31:19 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-09-19 21:07:14 -0700 |
commit | aed27505015fdb3ca69f9565d4b1e3a159f1ae89 (patch) | |
tree | 4a69cb765abcc2cf21d29359b8af2a87557f04a9 | |
parent | 6e3eddc267e6d0e2c3d64d821ff03de9a2c52f53 (diff) | |
download | kafka-python-aed27505015fdb3ca69f9565d4b1e3a159f1ae89.tar.gz |
Greedy unpack messagesets; simplify fetch and consumer iterators -- just call poll() w/ 1 max record
-rw-r--r-- | kafka/consumer/fetcher.py | 136 | ||||
-rw-r--r-- | kafka/consumer/group.py | 117 |
2 files changed, 42 insertions, 211 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 21c0b30..a237c0c 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -94,7 +94,6 @@ class Fetcher(six.Iterator): self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} - self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -106,16 +105,6 @@ class Fetcher(six.Iterator): Returns: List of Futures: each future resolves to a FetchResponse """ - # We need to be careful when creating fetch records during iteration - # so we verify that there are no records in the deque, or in an - # iterator - if self._records or self._iterator: - log.debug('Skipping send_fetches because there are unconsumed' - ' records internally') - return [] - return self._send_fetches() - - def _send_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -293,10 +282,12 @@ class Fetcher(six.Iterator): copied_record_too_large_partitions, self.config['max_partition_fetch_bytes']) - def fetched_records(self): + def fetched_records(self, max_records=None): """Returns previously fetched records and updates consumed offsets. - Incompatible with iterator interface - use one or the other, not both. + Arguments: + max_records (int): Maximum number of records returned. Defaults + to max_poll_records configuration. Raises: OffsetOutOfRangeError: if no subscription offset_reset_strategy @@ -306,22 +297,21 @@ class Fetcher(six.Iterator): configured max_partition_fetch_bytes TopicAuthorizationError: if consumer is not authorized to fetch messages from the topic - AssertionError: if used with iterator (incompatible) Returns: dict: {TopicPartition: [messages]} """ - assert self._iterator is None, ( - 'fetched_records is incompatible with message iterator') + if max_records is None: + max_records = self.config['max_poll_records'] + if self._subscriptions.needs_partition_assignment: return {} - drained = collections.defaultdict(list) self._raise_if_offset_out_of_range() self._raise_if_unauthorized_topics() self._raise_if_record_too_large() - max_records = self.config['max_poll_records'] + drained = collections.defaultdict(list) while self._records and max_records > 0: part = self._records.popleft() max_records -= self._append(drained, part, max_records) @@ -349,13 +339,15 @@ class Fetcher(six.Iterator): elif fetch_offset == position: part_records = part.take(max_records) - next_offset = part_records[-1][0] + 1 + if not part_records: + return 0 + next_offset = part_records[-1].offset + 1 log.log(0, "Returning fetched records at offset %d for assigned" " partition %s and update position to %s", position, tp, next_offset) - for record in self._unpack_message_set(tp, part_records): + for record in part_records: # Fetched compressed messages may include additional records if record.offset < fetch_offset: log.debug("Skipping message offset: %s (expecting %s)", @@ -372,6 +364,7 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s at offset %s since" " the current position is %d", tp, part.fetch_offset, position) + part.discard() return 0 @@ -444,98 +437,17 @@ class Fetcher(six.Iterator): log.exception('StopIteration raised unpacking messageset: %s', e) raise Exception('StopIteration raised unpacking messageset') - def _message_generator(self): - """Iterate over fetched_records""" - if self._subscriptions.needs_partition_assignment: - raise StopIteration('Subscription needs partition assignment') - - while self._records: - - # Check on each iteration since this is a generator - self._raise_if_offset_out_of_range() - self._raise_if_unauthorized_topics() - self._raise_if_record_too_large() - - # Send additional FetchRequests when the internal queue is low - # this should enable moderate pipelining - if len(self._records) <= self.config['iterator_refetch_records']: - self._send_fetches() - - part = self._records.popleft() - tp = part.topic_partition - - if not self._subscriptions.is_assigned(tp): - # this can happen when a rebalance happened before - # fetched records are returned - log.debug("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) - continue - - # note that the consumed position should always be available - # as long as the partition is still assigned - position = self._subscriptions.assignment[tp].position - if not self._subscriptions.is_fetchable(tp): - # this can happen when a partition consumption paused before - # fetched records are returned - log.debug("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) - - elif part.fetch_offset == position: - log.log(0, "Returning fetched records at offset %d for assigned" - " partition %s", position, tp) - - # We can ignore any prior signal to drop pending message sets - # because we are starting from a fresh one where fetch_offset == position - # i.e., the user seek()'d to this position - self._subscriptions.assignment[tp].drop_pending_message_set = False - - for msg in self._unpack_message_set(tp, part.messages): - - # Because we are in a generator, it is possible for - # subscription state to change between yield calls - # so we need to re-check on each loop - # this should catch assignment changes, pauses - # and resets via seek_to_beginning / seek_to_end - if not self._subscriptions.is_fetchable(tp): - log.debug("Not returning fetched records for partition %s" - " since it is no longer fetchable", tp) - break - - # If there is a seek during message iteration, - # we should stop unpacking this message set and - # wait for a new fetch response that aligns with the - # new seek position - elif self._subscriptions.assignment[tp].drop_pending_message_set: - log.debug("Skipping remainder of message set for partition %s", tp) - self._subscriptions.assignment[tp].drop_pending_message_set = False - break - - # Compressed messagesets may include earlier messages - elif msg.offset < self._subscriptions.assignment[tp].position: - log.debug("Skipping message offset: %s (expecting %s)", - msg.offset, - self._subscriptions.assignment[tp].position) - continue - - self._subscriptions.assignment[tp].position = msg.offset + 1 - yield msg - else: - # these records aren't next in line based on the last consumed - # position, ignore them they must be from an obsolete request - log.debug("Ignoring fetched records for %s at offset %s", - tp, part.fetch_offset) - def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - if not self._iterator: - self._iterator = self._message_generator() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise + ret = self.fetched_records(max_records=1) + if not ret: + raise StopIteration + assert len(ret.keys()) == 1 + (messages,) = ret.values() + assert len(messages) == 1 + return messages[0] def _deserialize(self, msg): if self.config['key_deserializer']: @@ -717,7 +629,8 @@ class Fetcher(six.Iterator): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position) - self._records.append(self.PartitionRecords(fetch_offset, tp, messages)) + unpacked = list(self._unpack_message_set(tp, messages)) + self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked)) last_offset, _, _ = messages[-1] self._sensors.records_fetch_lag.record(highwater - last_offset) num_bytes = sum(msg[1] for msg in messages) @@ -757,8 +670,7 @@ class Fetcher(six.Iterator): self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) self._sensors.fetch_latency.record((recv_time - send_time) * 1000) - class PartitionRecords(object): - + class PartitionRecords(six.Iterator): def __init__(self, fetch_offset, tp, messages): self.fetch_offset = fetch_offset self.topic_partition = tp @@ -780,7 +692,7 @@ class Fetcher(six.Iterator): self.messages = self.messages[n:] if self.messages: - self.fetch_offset = self.messages[0][0] + self.fetch_offset = self.messages[0].offset return res diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 08f50e9..83a3531 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -128,7 +128,7 @@ class KafkaConsumer(six.Iterator): [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] consumer_timeout_ms (int): number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the - iterator). Default -1 (block forever). + iterator). Default block forever [float('inf')]. skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4 caused some messages to be corrupted via double-compression. By default, the fetcher will return these messages as a compressed @@ -226,7 +226,7 @@ class KafkaConsumer(six.Iterator): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'consumer_timeout_ms': -1, + 'consumer_timeout_ms': float('inf'), 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, @@ -298,8 +298,6 @@ class KafkaConsumer(six.Iterator): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False - self._iterator = None - self._consumer_timeout = float('inf') if topics: self._subscription.subscribe(topics=topics) @@ -486,7 +484,7 @@ class KafkaConsumer(six.Iterator): """ return self._client.cluster.partitions_for_topic(topic) - def poll(self, timeout_ms=0): + def poll(self, timeout_ms=0, max_records=None): """Fetch data from assigned topics / partitions. Records are fetched and returned in batches by topic-partition. @@ -508,13 +506,14 @@ class KafkaConsumer(six.Iterator): subscribed list of topics and partitions """ assert timeout_ms >= 0, 'Timeout must not be negative' - assert self._iterator is None, 'Incompatible with iterator interface' + if max_records is None: + max_records = self.config['max_poll_records'] # poll for new data until the timeout expires start = time.time() remaining = timeout_ms while True: - records = self._poll_once(remaining) + records = self._poll_once(remaining, max_records) if records: # before returning the fetched records, we can send off the # next round of fetches and avoid block waiting for their @@ -529,7 +528,7 @@ class KafkaConsumer(six.Iterator): if remaining <= 0: return {} - def _poll_once(self, timeout_ms): + def _poll_once(self, timeout_ms, max_records): """ Do one round of polling. In addition to checking for new data, this does any needed heart-beating, auto-commits, and offset updates. @@ -548,23 +547,22 @@ class KafkaConsumer(six.Iterator): elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() - # fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): self._update_fetch_positions(self._subscription.missing_fetch_positions()) - # init any new fetches (won't resend pending fetches) - records = self._fetcher.fetched_records() - # if data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately + records = self._fetcher.fetched_records(max_records) if records: return records + # send any new fetches (won't resend pending fetches) self._fetcher.send_fetches() + self._client.poll(timeout_ms=timeout_ms, sleep=True) - return self._fetcher.fetched_records() + return self._fetcher.fetched_records(max_records) def position(self, partition): """Get the offset of the next record that will be fetched @@ -835,96 +833,17 @@ class KafkaConsumer(six.Iterator): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) - def _message_generator(self): - assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' - while time.time() < self._consumer_timeout: - - if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() - self._coordinator.ensure_active_group() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() - - # fetch offsets for any subscribed partitions that we arent tracking yet - if not self._subscription.has_all_fetch_positions(): - partitions = self._subscription.missing_fetch_positions() - self._update_fetch_positions(partitions) - - poll_ms = 1000 * (self._consumer_timeout - time.time()) - if not self._fetcher.in_flight_fetches(): - poll_ms = 0 - self._client.poll(timeout_ms=poll_ms, sleep=True) - - # We need to make sure we at least keep up with scheduled tasks, - # like heartbeats, auto-commits, and metadata refreshes - timeout_at = self._next_timeout() - - # Because the consumer client poll does not sleep unless blocking on - # network IO, we need to explicitly sleep when we know we are idle - # because we haven't been assigned any partitions to fetch / consume - if self._use_consumer_group() and not self.assignment(): - sleep_time = max(timeout_at - time.time(), 0) - if sleep_time > 0 and not self._client.in_flight_request_count(): - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - - # Short-circuit the fetch iterator if we are already timed out - # to avoid any unintentional interaction with fetcher setup - if time.time() > timeout_at: - continue - - for msg in self._fetcher: - yield msg - if time.time() > timeout_at: - log.debug("internal iterator timeout - breaking for poll") - break - - # an else block on a for loop only executes if there was no break - # so this should only be called on a StopIteration from the fetcher - # and we assume that it is safe to send_fetches when fetcher is done - # i.e., there are no more records stored internally - else: - self._fetcher.send_fetches() - - def _next_timeout(self): - timeout = min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) - - # Although the delayed_tasks timeout above should cover processing - # HeartbeatRequests, it is still possible that HeartbeatResponses - # are left unprocessed during a long _fetcher iteration without - # an intermediate poll(). And because tasks are responsible for - # rescheduling themselves, an unprocessed response will prevent - # the next heartbeat from being sent. This check should help - # avoid that. - if self._use_consumer_group(): - heartbeat = time.time() + self._coordinator.heartbeat.ttl() - timeout = min(timeout, heartbeat) - return timeout - def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - if not self._iterator: - self._iterator = self._message_generator() - - self._set_consumer_timeout() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise - - def _set_consumer_timeout(self): - # consumer_timeout_ms can be used to stop iteration early - if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( - self.config['consumer_timeout_ms'] / 1000.0) + ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1) + if not ret: + raise StopIteration + assert len(ret.keys()) == 1 + (messages,) = ret.values() + assert len(messages) == 1 + return messages[0] # old KafkaConsumer methods are deprecated def configure(self, **configs): |