diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 109 |
1 files changed, 70 insertions, 39 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 5be1bef..28b53ec 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -from collections import defaultdict from itertools import izip_longest, repeat import logging import time @@ -235,6 +234,12 @@ class SimpleConsumer(Consumer): buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) if max_buffer_size is not None and buffer_size > max_buffer_size: raise ValueError("buffer_size (%d) is greater than " @@ -245,17 +250,10 @@ class SimpleConsumer(Consumer): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes - self.fetch_started = defaultdict(bool) # defaults to false + self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout self.queue = Queue() - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) @@ -305,6 +303,10 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + self.queue = Queue() + def get_messages(self, count=1, block=True, timeout=0.1): """ Fetch the specified number of messages @@ -316,33 +318,69 @@ class SimpleConsumer(Consumer): it will block forever. """ messages = [] - if timeout: + if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): - message = self.get_message(block, timeout) - if message: - messages.append(message) + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 count -= 1 else: # Ran out of messages for the last request. if not block: # If we're not blocking, break. break - if timeout: + if timeout is not None: # If we're blocking and have a timeout, reduce it to the # appropriate value timeout = max_time - time.time() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() return messages - def get_message(self, block=True, timeout=0.1): + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): + """ + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message + """ if self.queue.empty(): # We're out of messages, go grab some more. with FetchContext(self, block, timeout): self._fetch() try: - return self.queue.get_nowait() + partition, message = self.queue.get_nowait() + + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: + return partition, message + else: + return message except Empty: return None @@ -367,11 +405,11 @@ class SimpleConsumer(Consumer): def _fetch(self): # Create fetch request payloads for all the partitions requests = [] - partitions = self.offsets.keys() + partitions = self.fetch_offsets.keys() while partitions: for partition in partitions: requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], + self.fetch_offsets[partition], self.buffer_size)) # Send request responses = self.client.send_fetch_request( @@ -384,18 +422,9 @@ class SimpleConsumer(Consumer): partition = resp.partition try: for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) - else: - self.queue.put(message) + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): @@ -585,12 +614,11 @@ class MultiProcessConsumer(Consumer): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -613,9 +641,10 @@ class MultiProcessConsumer(Consumer): self.size.value = count self.pause.clear() - if timeout: + if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not @@ -630,16 +659,18 @@ class MultiProcessConsumer(Consumer): break messages.append(message) - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset - self.count_since_commit += 1 - self._auto_commit() + new_offsets[partition] = message.offset + 1 count -= 1 - timeout = max_time - time.time() + if timeout is not None: + timeout = max_time - time.time() self.size.value = 0 self.start.clear() self.pause.set() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages |