summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-15 16:22:41 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-15 16:26:15 -0800
commite0f726204ab0b8b8ae5c29ae07c1aa369a5a6906 (patch)
treeda86154e8eb8fad94419502e2729fa6288bda9c3 /kafka/consumer.py
parent8b3793a649b470879d2684ad0a127c32a1348682 (diff)
downloadkafka-python-e0f726204ab0b8b8ae5c29ae07c1aa369a5a6906.tar.gz
Make get_messages() update and commit offsets just before returning
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py51
1 files changed, 35 insertions, 16 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 474e1f5..12e1af6 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import
-from collections import defaultdict
from itertools import izip_longest, repeat
import logging
import time
@@ -318,10 +317,17 @@ class SimpleConsumer(Consumer):
if timeout is not None:
max_time = time.time() + timeout
+ new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
- message = self.get_message(block, timeout)
- if message:
- messages.append(message)
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
count -= 1
else:
# Ran out of messages for the last request.
@@ -333,9 +339,17 @@ class SimpleConsumer(Consumer):
# appropriate value
timeout = max_time - time.time()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
return messages
- def get_message(self, block=True, timeout=0.1):
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
if self.queue.empty():
# We're out of messages, go grab some more.
with FetchContext(self, block, timeout):
@@ -343,14 +357,17 @@ class SimpleConsumer(Consumer):
try:
partition, message = self.queue.get_nowait()
- # Update partition offset
- self.offsets[partition] = message.offset + 1
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
- if self.partition_info:
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
return partition, message
else:
return message
@@ -613,6 +630,7 @@ class MultiProcessConsumer(Consumer):
if timeout is not None:
max_time = time.time() + timeout
+ new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
@@ -627,11 +645,7 @@ class MultiProcessConsumer(Consumer):
break
messages.append(message)
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset + 1
- self.count_since_commit += 1
- self._auto_commit()
+ new_offsets[partition] = message.offset + 1
count -= 1
if timeout is not None:
timeout = max_time - time.time()
@@ -640,4 +654,9 @@ class MultiProcessConsumer(Consumer):
self.start.clear()
self.pause.set()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+
return messages