diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 38 | ||||
-rw-r--r-- | kafka/consumer/group.py | 3 |
2 files changed, 28 insertions, 13 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index fc03d7a..5e15424 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -197,6 +197,9 @@ class Fetcher(object): contains OffsetOutOfRangeError and the default_reset_policy is None """ + if not self._offset_out_of_range_partitions: + return + current_out_of_range_partitions = {} # filter only the fetchable partitions @@ -232,18 +235,20 @@ class Fetcher(object): Raises: RecordTooLargeError: if there is a message larger than fetch size """ + if not self._record_too_large_partitions: + return + copied_record_too_large_partitions = dict(self._record_too_large_partitions) self._record_too_large_partitions.clear() - if copied_record_too_large_partitions: - raise RecordTooLargeError( - "There are some messages at [Partition=Offset]: %s " - " whose size is larger than the fetch size %s" - " and hence cannot be ever returned." - " Increase the fetch size, or decrease the maximum message" - " size the broker will allow.", - copied_record_too_large_partitions, - self.config['max_partition_fetch_bytes']) + raise RecordTooLargeError( + "There are some messages at [Partition=Offset]: %s " + " whose size is larger than the fetch size %s" + " and hence cannot be ever returned." + " Increase the fetch size, or decrease the maximum message" + " size the broker will allow.", + copied_record_too_large_partitions, + self.config['max_partition_fetch_bytes']) def fetched_records(self): """Returns previously fetched records and updates consumed offsets. @@ -324,11 +329,13 @@ class Fetcher(object): if self._subscriptions.needs_partition_assignment: raise StopIteration('Subscription needs partition assignment') - self._raise_if_offset_out_of_range() - self._raise_if_unauthorized_topics() - self._raise_if_record_too_large() - while self._records: + + # Check on each iteration since this is a generator + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): @@ -361,6 +368,11 @@ class Fetcher(object): log.warning("Ignoring fetched records for %s at offset %s", tp, fetch_offset) + # Send any additional FetchRequests that we can now + # this will likely fetch each partition individually, rather than + # fetch multiple partitions in bulk when they are on the same broker + self.init_fetches() + def _deserialize(self, msg): if self.config['key_deserializer']: key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index bde283c..67e352a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -585,5 +585,8 @@ class KafkaConsumer(object): # init any new fetches (won't resend pending fetches) self._fetcher.init_fetches() self._client.poll(self.config['request_timeout_ms'] / 1000.0) + timeout = time.time() + self.config['heartbeat_interval_ms'] / 1000.0 for msg in self._fetcher: yield msg + if time.time() > timeout: + break |