diff options
-rw-r--r-- | kafka/consumer/simple.py | 62 |
1 files changed, 33 insertions, 29 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 82a1fe2..b08255b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -126,8 +126,8 @@ class SimpleConsumer(Consumer): auto_commit_every_t=auto_commit_every_t) if max_buffer_size is not None and buffer_size > max_buffer_size: - raise ValueError("buffer_size (%d) is greater than " - "max_buffer_size (%d)" % + raise ValueError('buffer_size (%d) is greater than ' + 'max_buffer_size (%d)' % (buffer_size, max_buffer_size)) self.buffer_size = buffer_size self.max_buffer_size = max_buffer_size @@ -227,7 +227,7 @@ class SimpleConsumer(Consumer): self.offsets[resp.partition] = \ resp.offsets[0] + deltas[resp.partition] else: - raise ValueError("Unexpected value for `whence`, %d" % whence) + raise ValueError('Unexpected value for `whence`, %d' % whence) # Reset queue and fetch offsets since they are invalid self.fetch_offsets = self.offsets.copy() @@ -250,35 +250,32 @@ class SimpleConsumer(Consumer): """ messages = [] if timeout is not None: - max_time = time.time() + timeout + timeout += time.time() new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - result = self._get_message(block, timeout, get_partition_info=True, + log.debug('getting %d messages', count) + while len(messages) < count: + block_time = timeout - time.time() + log.debug('calling _get_message block=%s timeout=%s', block, block_time) + result = self._get_message(block, block_time, + get_partition_info=True, update_offset=False) - if result: - partition, message = result - if self.partition_info: - messages.append(result) - else: - messages.append(message) - new_offsets[partition] = message.offset + 1 - count -= 1 - else: - # Ran out of messages for the last request. - if not block: - # If we're not blocking, break. - break + log.debug('got %s from _get_messages', result) + if not result: + if block and (timeout is None or time.time() <= timeout): + continue + break - # If we have a timeout, reduce it to the - # appropriate value - if timeout is not None: - timeout = max_time - time.time() + partition, message = result + _msg = (partition, message) if self.partition_info else message + messages.append(_msg) + new_offsets[partition] = message.offset + 1 # Update and commit offsets if necessary self.offsets.update(new_offsets) self.count_since_commit += len(messages) self._auto_commit() + log.debug('got %d messages: %s', len(messages), messages) return messages def get_message(self, block=True, timeout=0.1, get_partition_info=None): @@ -292,10 +289,16 @@ class SimpleConsumer(Consumer): If get_partition_info is True, returns (partition, message) If get_partition_info is False, returns message """ - if self.queue.empty(): + start_at = time.time() + while self.queue.empty(): # We're out of messages, go grab some more. + log.debug('internal queue empty, fetching more messages') with FetchContext(self, block, timeout): self._fetch() + + if not block or time.time() > (start_at + timeout): + break + try: partition, message = self.queue.get_nowait() @@ -314,6 +317,7 @@ class SimpleConsumer(Consumer): else: return message except Empty: + log.debug('internal queue empty after fetch - returning None') return None def __iter__(self): @@ -396,7 +400,7 @@ class SimpleConsumer(Consumer): except ConsumerFetchSizeTooSmall: if (self.max_buffer_size is not None and buffer_size == self.max_buffer_size): - log.error("Max fetch size %d too small", + log.error('Max fetch size %d too small', self.max_buffer_size) raise if self.max_buffer_size is None: @@ -404,12 +408,12 @@ class SimpleConsumer(Consumer): else: buffer_size = min(buffer_size * 2, self.max_buffer_size) - log.warn("Fetch size too small, increase to %d (2x) " - "and retry", buffer_size) + log.warning('Fetch size too small, increase to %d (2x) ' + 'and retry', buffer_size) retry_partitions[partition] = buffer_size except ConsumerNoMoreData as e: - log.debug("Iteration was ended by %r", e) + log.debug('Iteration was ended by %r', e) except StopIteration: # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) + log.debug('Done iterating over partition %s', partition) partitions = retry_partitions |