summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-31 11:21:16 -0800
committerDana Powers <dana.powers@rd.io>2015-12-31 18:12:20 -0800
commit89e22a0e457ac4f6ddbf237ff32e5a278c2c02ed (patch)
treeb0fc70cd341d1ac7e81f61bfda2f11dd897a24c3
parent61ccbc5f7bd1527096c4609f2e881e6a1075e579 (diff)
downloadkafka-python-89e22a0e457ac4f6ddbf237ff32e5a278c2c02ed.tar.gz
Improve request pipelining in consumer iterator
-rw-r--r--kafka/consumer/fetcher.py38
-rw-r--r--kafka/consumer/group.py3
2 files changed, 28 insertions, 13 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index fc03d7a..5e15424 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -197,6 +197,9 @@ class Fetcher(object):
contains OffsetOutOfRangeError and the default_reset_policy is
None
"""
+ if not self._offset_out_of_range_partitions:
+ return
+
current_out_of_range_partitions = {}
# filter only the fetchable partitions
@@ -232,18 +235,20 @@ class Fetcher(object):
Raises:
RecordTooLargeError: if there is a message larger than fetch size
"""
+ if not self._record_too_large_partitions:
+ return
+
copied_record_too_large_partitions = dict(self._record_too_large_partitions)
self._record_too_large_partitions.clear()
- if copied_record_too_large_partitions:
- raise RecordTooLargeError(
- "There are some messages at [Partition=Offset]: %s "
- " whose size is larger than the fetch size %s"
- " and hence cannot be ever returned."
- " Increase the fetch size, or decrease the maximum message"
- " size the broker will allow.",
- copied_record_too_large_partitions,
- self.config['max_partition_fetch_bytes'])
+ raise RecordTooLargeError(
+ "There are some messages at [Partition=Offset]: %s "
+ " whose size is larger than the fetch size %s"
+ " and hence cannot be ever returned."
+ " Increase the fetch size, or decrease the maximum message"
+ " size the broker will allow.",
+ copied_record_too_large_partitions,
+ self.config['max_partition_fetch_bytes'])
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets.
@@ -324,11 +329,13 @@ class Fetcher(object):
if self._subscriptions.needs_partition_assignment:
raise StopIteration('Subscription needs partition assignment')
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
-
while self._records:
+
+ # Check on each iteration since this is a generator
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
(fetch_offset, tp, messages) = self._records.popleft()
if not self._subscriptions.is_assigned(tp):
@@ -361,6 +368,11 @@ class Fetcher(object):
log.warning("Ignoring fetched records for %s at offset %s",
tp, fetch_offset)
+ # Send any additional FetchRequests that we can now
+ # this will likely fetch each partition individually, rather than
+ # fetch multiple partitions in bulk when they are on the same broker
+ self.init_fetches()
+
def _deserialize(self, msg):
if self.config['key_deserializer']:
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index bde283c..67e352a 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -585,5 +585,8 @@ class KafkaConsumer(object):
# init any new fetches (won't resend pending fetches)
self._fetcher.init_fetches()
self._client.poll(self.config['request_timeout_ms'] / 1000.0)
+ timeout = time.time() + self.config['heartbeat_interval_ms'] / 1000.0
for msg in self._fetcher:
yield msg
+ if time.time() > timeout:
+ break