diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-09-17 15:21:42 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-09-19 21:07:14 -0700 |
commit | 5b86640067218f692268045049549a434e392ae5 (patch) | |
tree | 3cf1a7f0d6dab5edbf76b036b5eece02facc5e8a | |
parent | ae36423391cb318844f9e2f51666b1892f4dddd1 (diff) | |
download | kafka-python-5b86640067218f692268045049549a434e392ae5.tar.gz |
Optimizations -- use a message index instead of reslicing list; only send fetches when partition record fully consumed
-rw-r--r-- | kafka/consumer/fetcher.py | 39 | ||||
-rw-r--r-- | kafka/consumer/group.py | 5 |
2 files changed, 24 insertions, 20 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index a237c0c..cfed808 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -298,26 +298,32 @@ class Fetcher(six.Iterator): TopicAuthorizationError: if consumer is not authorized to fetch messages from the topic - Returns: - dict: {TopicPartition: [messages]} + Returns: (records (dict), partial (bool)) + records: {TopicPartition: [messages]} + partial: True if records returned did not fully drain any pending + partition requests. This may be useful for choosing when to + pipeline additional fetch requests. """ if max_records is None: max_records = self.config['max_poll_records'] if self._subscriptions.needs_partition_assignment: - return {} + return {}, False self._raise_if_offset_out_of_range() self._raise_if_unauthorized_topics() self._raise_if_record_too_large() drained = collections.defaultdict(list) + partial = bool(self._records and max_records) while self._records and max_records > 0: part = self._records.popleft() max_records -= self._append(drained, part, max_records) - if part.messages: + if part.has_more(): self._records.appendleft(part) - return dict(drained) + else: + partial &= False + return dict(drained), partial def _append(self, drained, part, max_records): tp = part.topic_partition @@ -675,27 +681,24 @@ class Fetcher(six.Iterator): self.fetch_offset = fetch_offset self.topic_partition = tp self.messages = messages + self.message_idx = 0 def discard(self): self.messages = None def take(self, n): - if self.messages is None: + if not self.has_more(): return [] - - if n >= len(self.messages): - res = self.messages - self.messages = None - return res - - res = self.messages[:n] - self.messages = self.messages[n:] - - if self.messages: - self.fetch_offset = self.messages[0].offset - + next_idx = self.message_idx + n + res = self.messages[self.message_idx:next_idx] + self.message_idx = next_idx + if self.has_more(): + self.fetch_offset = self.messages[self.message_idx].offset return res + def has_more(self): + return self.message_idx < len(self.messages) + class FetchManagerMetrics(object): def __init__(self, metrics, prefix): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 83a3531..8060910 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -554,7 +554,7 @@ class KafkaConsumer(six.Iterator): # if data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately - records = self._fetcher.fetched_records(max_records) + records, partial = self._fetcher.fetched_records(max_records) if records: return records @@ -562,7 +562,8 @@ class KafkaConsumer(six.Iterator): self._fetcher.send_fetches() self._client.poll(timeout_ms=timeout_ms, sleep=True) - return self._fetcher.fetched_records(max_records) + records, _ = self._fetcher.fetched_records(max_records) + return records def position(self, partition): """Get the offset of the next record that will be fetched |