diff options
-rw-r--r-- | kafka/consumer.py | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 657024f..b202b23 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -235,6 +235,12 @@ class SimpleConsumer(Consumer): buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) if max_buffer_size is not None and buffer_size > max_buffer_size: raise ValueError("buffer_size (%d) is greater than " @@ -245,17 +251,10 @@ class SimpleConsumer(Consumer): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes - self.fetch_started = defaultdict(bool) # defaults to false + self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout self.queue = Queue() - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -301,6 +300,10 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + self.queue = Queue() + def get_messages(self, count=1, block=True, timeout=0.1): """ Fetch the specified number of messages @@ -375,11 +378,11 @@ class SimpleConsumer(Consumer): def _fetch(self): # Create fetch request payloads for all the partitions requests = [] - partitions = self.offsets.keys() + partitions = self.fetch_offsets.keys() while partitions: for partition in partitions: requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], + self.fetch_offsets[partition], self.buffer_size)) # Send request responses = self.client.send_fetch_request( @@ -394,6 +397,7 @@ class SimpleConsumer(Consumer): for message in resp.messages: # Put the message in our queue self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): |