summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-15 11:13:59 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-15 11:13:59 -0800
commitc36cb618d2ba5fcba118dd8b87bf51f26074c8f1 (patch)
treeedb10f23221e7cf5a8ca1ffb62f7a8195283ecfa /kafka/consumer.py
parent9644166048d6fe1cdbd1fc3096329ee2142b147e (diff)
downloadkafka-python-c36cb618d2ba5fcba118dd8b87bf51f26074c8f1.tar.gz
Fix offset increments:
* Increment the offset before returning a message rather than when putting it in the internal queue. This prevents committing the wrong offsets. * In MultiProcessConsumer, store the offset of the next message
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py33
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 522d6ca..657024f 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -338,7 +338,19 @@ class SimpleConsumer(Consumer):
with FetchContext(self, block, timeout):
self._fetch()
try:
- return self.queue.get_nowait()
+ partition, message = self.queue.get_nowait()
+
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ if self.partition_info:
+ return partition, message
+ else:
+ return message
except Empty:
return None
@@ -380,18 +392,8 @@ class SimpleConsumer(Consumer):
partition = resp.partition
try:
for message in resp.messages:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
# Put the message in our queue
- if self.partition_info:
- self.queue.put((partition, message))
- else:
- self.queue.put(message)
+ self.queue.put((partition, message))
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
@@ -577,12 +579,11 @@ class MultiProcessConsumer(Consumer):
break
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.start.clear()
- yield message
-
self.count_since_commit += 1
self._auto_commit()
+ yield message
self.start.clear()
@@ -624,7 +625,7 @@ class MultiProcessConsumer(Consumer):
messages.append(message)
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.count_since_commit += 1
self._auto_commit()
count -= 1