summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-09-17 15:21:42 -0700
committerDana Powers <dana.powers@gmail.com>2016-09-19 21:07:14 -0700
commit5b86640067218f692268045049549a434e392ae5 (patch)
tree3cf1a7f0d6dab5edbf76b036b5eece02facc5e8a
parentae36423391cb318844f9e2f51666b1892f4dddd1 (diff)
downloadkafka-python-5b86640067218f692268045049549a434e392ae5.tar.gz
Optimizations -- use a message index instead of reslicing list; only send fetches when partition record fully consumed
-rw-r--r--kafka/consumer/fetcher.py39
-rw-r--r--kafka/consumer/group.py5
2 files changed, 24 insertions, 20 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index a237c0c..cfed808 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -298,26 +298,32 @@ class Fetcher(six.Iterator):
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
- Returns:
- dict: {TopicPartition: [messages]}
+ Returns: (records (dict), partial (bool))
+ records: {TopicPartition: [messages]}
+ partial: True if records returned did not fully drain any pending
+ partition requests. This may be useful for choosing when to
+ pipeline additional fetch requests.
"""
if max_records is None:
max_records = self.config['max_poll_records']
if self._subscriptions.needs_partition_assignment:
- return {}
+ return {}, False
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
drained = collections.defaultdict(list)
+ partial = bool(self._records and max_records)
while self._records and max_records > 0:
part = self._records.popleft()
max_records -= self._append(drained, part, max_records)
- if part.messages:
+ if part.has_more():
self._records.appendleft(part)
- return dict(drained)
+ else:
+ partial &= False
+ return dict(drained), partial
def _append(self, drained, part, max_records):
tp = part.topic_partition
@@ -675,27 +681,24 @@ class Fetcher(six.Iterator):
self.fetch_offset = fetch_offset
self.topic_partition = tp
self.messages = messages
+ self.message_idx = 0
def discard(self):
self.messages = None
def take(self, n):
- if self.messages is None:
+ if not self.has_more():
return []
-
- if n >= len(self.messages):
- res = self.messages
- self.messages = None
- return res
-
- res = self.messages[:n]
- self.messages = self.messages[n:]
-
- if self.messages:
- self.fetch_offset = self.messages[0].offset
-
+ next_idx = self.message_idx + n
+ res = self.messages[self.message_idx:next_idx]
+ self.message_idx = next_idx
+ if self.has_more():
+ self.fetch_offset = self.messages[self.message_idx].offset
return res
+ def has_more(self):
+ return self.message_idx < len(self.messages)
+
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 83a3531..8060910 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -554,7 +554,7 @@ class KafkaConsumer(six.Iterator):
# if data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
- records = self._fetcher.fetched_records(max_records)
+ records, partial = self._fetcher.fetched_records(max_records)
if records:
return records
@@ -562,7 +562,8 @@ class KafkaConsumer(six.Iterator):
self._fetcher.send_fetches()
self._client.poll(timeout_ms=timeout_ms, sleep=True)
- return self._fetcher.fetched_records(max_records)
+ records, _ = self._fetcher.fetched_records(max_records)
+ return records
def position(self, partition):
"""Get the offset of the next record that will be fetched