diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-15 11:13:59 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-15 11:13:59 -0800 |
commit | c36cb618d2ba5fcba118dd8b87bf51f26074c8f1 (patch) | |
tree | edb10f23221e7cf5a8ca1ffb62f7a8195283ecfa /kafka/consumer.py | |
parent | 9644166048d6fe1cdbd1fc3096329ee2142b147e (diff) | |
download | kafka-python-c36cb618d2ba5fcba118dd8b87bf51f26074c8f1.tar.gz |
Fix offset increments:
* Increment the offset before returning a message rather than when
putting it in the internal queue. This prevents committing the wrong offsets.
* In MultiProcessConsumer, store the offset of the next message
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 522d6ca..657024f 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -338,7 +338,19 @@ class SimpleConsumer(Consumer): with FetchContext(self, block, timeout): self._fetch() try: - return self.queue.get_nowait() + partition, message = self.queue.get_nowait() + + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if self.partition_info: + return partition, message + else: + return message except Empty: return None @@ -380,18 +392,8 @@ class SimpleConsumer(Consumer): partition = resp.partition try: for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) - else: - self.queue.put(message) + self.queue.put((partition, message)) except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): @@ -577,12 +579,11 @@ class MultiProcessConsumer(Consumer): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -624,7 +625,7 @@ class MultiProcessConsumer(Consumer): messages.append(message) # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.count_since_commit += 1 self._auto_commit() count -= 1 |