diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 522d6ca..657024f 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -338,7 +338,19 @@ class SimpleConsumer(Consumer): with FetchContext(self, block, timeout): self._fetch() try: - return self.queue.get_nowait() + partition, message = self.queue.get_nowait() + + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if self.partition_info: + return partition, message + else: + return message except Empty: return None @@ -380,18 +392,8 @@ class SimpleConsumer(Consumer): partition = resp.partition try: for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) - else: - self.queue.put(message) + self.queue.put((partition, message)) except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): @@ -577,12 +579,11 @@ class MultiProcessConsumer(Consumer): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -624,7 +625,7 @@ class MultiProcessConsumer(Consumer): messages.append(message) # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.count_since_commit += 1 self._auto_commit() count -= 1 |