diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-15 16:22:41 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-15 16:26:15 -0800 |
commit | e0f726204ab0b8b8ae5c29ae07c1aa369a5a6906 (patch) | |
tree | da86154e8eb8fad94419502e2729fa6288bda9c3 /kafka/consumer.py | |
parent | 8b3793a649b470879d2684ad0a127c32a1348682 (diff) | |
download | kafka-python-e0f726204ab0b8b8ae5c29ae07c1aa369a5a6906.tar.gz |
Make get_messages() update and commit offsets just before returning
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 51 |
1 files changed, 35 insertions, 16 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 474e1f5..12e1af6 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -from collections import defaultdict from itertools import izip_longest, repeat import logging import time @@ -318,10 +317,17 @@ class SimpleConsumer(Consumer): if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): - message = self.get_message(block, timeout) - if message: - messages.append(message) + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 count -= 1 else: # Ran out of messages for the last request. @@ -333,9 +339,17 @@ class SimpleConsumer(Consumer): # appropriate value timeout = max_time - time.time() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() return messages - def get_message(self, block=True, timeout=0.1): + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): if self.queue.empty(): # We're out of messages, go grab some more. with FetchContext(self, block, timeout): @@ -343,14 +357,17 @@ class SimpleConsumer(Consumer): try: partition, message = self.queue.get_nowait() - # Update partition offset - self.offsets[partition] = message.offset + 1 + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() - if self.partition_info: + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: return partition, message else: return message @@ -613,6 +630,7 @@ class MultiProcessConsumer(Consumer): if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not @@ -627,11 +645,7 @@ class MultiProcessConsumer(Consumer): break messages.append(message) - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + 1 - self.count_since_commit += 1 - self._auto_commit() + new_offsets[partition] = message.offset + 1 count -= 1 if timeout is not None: timeout = max_time - time.time() @@ -640,4 +654,9 @@ class MultiProcessConsumer(Consumer): self.start.clear() self.pause.set() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages |