diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-31 11:21:16 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-31 18:12:20 -0800 |
commit | 89e22a0e457ac4f6ddbf237ff32e5a278c2c02ed (patch) | |
tree | b0fc70cd341d1ac7e81f61bfda2f11dd897a24c3 | |
parent | 61ccbc5f7bd1527096c4609f2e881e6a1075e579 (diff) | |
download | kafka-python-89e22a0e457ac4f6ddbf237ff32e5a278c2c02ed.tar.gz |
Improve request pipelining in consumer iterator
-rw-r--r-- | kafka/consumer/fetcher.py | 38 | ||||
-rw-r--r-- | kafka/consumer/group.py | 3 |
2 files changed, 28 insertions, 13 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index fc03d7a..5e15424 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -197,6 +197,9 @@ class Fetcher(object): contains OffsetOutOfRangeError and the default_reset_policy is None """ + if not self._offset_out_of_range_partitions: + return + current_out_of_range_partitions = {} # filter only the fetchable partitions @@ -232,18 +235,20 @@ class Fetcher(object): Raises: RecordTooLargeError: if there is a message larger than fetch size """ + if not self._record_too_large_partitions: + return + copied_record_too_large_partitions = dict(self._record_too_large_partitions) self._record_too_large_partitions.clear() - if copied_record_too_large_partitions: - raise RecordTooLargeError( - "There are some messages at [Partition=Offset]: %s " - " whose size is larger than the fetch size %s" - " and hence cannot be ever returned." - " Increase the fetch size, or decrease the maximum message" - " size the broker will allow.", - copied_record_too_large_partitions, - self.config['max_partition_fetch_bytes']) + raise RecordTooLargeError( + "There are some messages at [Partition=Offset]: %s " + " whose size is larger than the fetch size %s" + " and hence cannot be ever returned." + " Increase the fetch size, or decrease the maximum message" + " size the broker will allow.", + copied_record_too_large_partitions, + self.config['max_partition_fetch_bytes']) def fetched_records(self): """Returns previously fetched records and updates consumed offsets. @@ -324,11 +329,13 @@ class Fetcher(object): if self._subscriptions.needs_partition_assignment: raise StopIteration('Subscription needs partition assignment') - self._raise_if_offset_out_of_range() - self._raise_if_unauthorized_topics() - self._raise_if_record_too_large() - while self._records: + + # Check on each iteration since this is a generator + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): @@ -361,6 +368,11 @@ class Fetcher(object): log.warning("Ignoring fetched records for %s at offset %s", tp, fetch_offset) + # Send any additional FetchRequests that we can now + # this will likely fetch each partition individually, rather than + # fetch multiple partitions in bulk when they are on the same broker + self.init_fetches() + def _deserialize(self, msg): if self.config['key_deserializer']: key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index bde283c..67e352a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -585,5 +585,8 @@ class KafkaConsumer(object): # init any new fetches (won't resend pending fetches) self._fetcher.init_fetches() self._client.poll(self.config['request_timeout_ms'] / 1000.0) + timeout = time.time() + self.config['heartbeat_interval_ms'] / 1000.0 for msg in self._fetcher: yield msg + if time.time() > timeout: + break |