summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py33
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 522d6ca..657024f 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -338,7 +338,19 @@ class SimpleConsumer(Consumer):
with FetchContext(self, block, timeout):
self._fetch()
try:
- return self.queue.get_nowait()
+ partition, message = self.queue.get_nowait()
+
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ if self.partition_info:
+ return partition, message
+ else:
+ return message
except Empty:
return None
@@ -380,18 +392,8 @@ class SimpleConsumer(Consumer):
partition = resp.partition
try:
for message in resp.messages:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
# Put the message in our queue
- if self.partition_info:
- self.queue.put((partition, message))
- else:
- self.queue.put(message)
+ self.queue.put((partition, message))
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
@@ -577,12 +579,11 @@ class MultiProcessConsumer(Consumer):
break
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.start.clear()
- yield message
-
self.count_since_commit += 1
self._auto_commit()
+ yield message
self.start.clear()
@@ -624,7 +625,7 @@ class MultiProcessConsumer(Consumer):
messages.append(message)
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.count_since_commit += 1
self._auto_commit()
count -= 1