diff options
-rw-r--r-- | kafka/consumer.py | 182 |
1 files changed, 70 insertions, 112 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 600c8c7..a5a3e26 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -3,8 +3,8 @@ from itertools import izip_longest, repeat import logging import time from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, @@ -227,6 +227,7 @@ class SimpleConsumer(Consumer): self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.queue = Queue(buffer_size) super(SimpleConsumer, self).__init__( client, group, topic, @@ -292,122 +293,75 @@ class SimpleConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified time (in seconds) + until count messages is fetched. If None, it will block forever. """ messages = [] - iterator = self.__iter__() - - # HACK: This splits the timeout between available partitions if timeout: - timeout = timeout * 1.0 / len(self.offsets) + max_time = time.time() + timeout - with FetchContext(self, block, timeout): - while count > 0: - try: - messages.append(next(iterator)) - except StopIteration: - break + while count > 0 and (timeout is None or timeout > 0): + message = self.get_message(block, timeout) + if message: + messages.append(message) count -= 1 + else: + # Ran out of messages for the last request. If we're not blocking, break. + if not block: + break + if timeout: + timeout = max_time - time.time() return messages - def __iter__(self): - """ - Create an iterate per partition. Iterate through them calling next() - until they are all exhausted. - """ - iters = {} - for partition, offset in self.offsets.items(): - iters[partition] = self.__iter_partition__(partition, offset) - - if len(iters) == 0: - return - - while True: - if len(iters) == 0: - break - - for partition, it in iters.items(): - try: - if self.partition_info: - yield (partition, it.next()) - else: - yield it.next() - except StopIteration: - log.debug("Done iterating over partition %s" % partition) - del iters[partition] - - # skip auto-commit since we didn't yield anything - continue - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - def __iter_partition__(self, partition, offset): - """ - Iterate over the messages in a partition. Create a FetchRequest - to get back a batch of messages, yield them one at a time. - After a batch is exhausted, start a new batch unless we've reached - the end of this partition. - """ - - # The offset that is stored in the consumer is the offset that - # we have consumed. In subsequent iterations, we are supposed to - # fetch the next message (that is from the next offset) - # However, for the 0th message, the offset should be as-is. - # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is - # problematic, since 0 is offset of a message which we have not yet - # consumed. - if self.fetch_started[partition]: - offset += 1 - - fetch_size = self.fetch_min_bytes + def get_message(self, block=True, timeout=0.1): + if self.queue.empty(): + with FetchContext(self, block, timeout): + self._fetch() + try: + return self.queue.get_nowait() + except Empty: + return None + def __iter__(self): while True: - # use MaxBytes = client's bufsize since we're only - # fetching one topic + partition - req = FetchRequest( - self.topic, partition, offset, self.buffer_size) - - (resp,) = self.client.send_fetch_request( - [req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) - - assert resp.topic == self.topic - assert resp.partition == partition + message = self.get_message(True, 100) + if message: + yield message + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(0.1) - next_offset = None + def _fetch(self): + requests = [] + partitions = self.offsets.keys() + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size)) + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + for resp in responses: + partition = resp.partition try: for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This - # is so that the consumer state is not lost in certain - # cases. - # - # For eg: the message is yielded and consumed by the - # caller, but the caller does not come back into the - # generator again. The message will be consumed but the - # status will not be updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message + self.offsets[partition] = message.offset + 1 + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 - log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) - continue + self.buffer_size *= 2 + log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size) except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) - - if next_offset is None: - break - else: - offset = next_offset + 1 + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): @@ -446,8 +400,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # indicates a specific number of messages, follow that advice count = 0 - for partition, message in consumer: - queue.put((partition, message)) + message = consumer.get_message() + if message: + queue.put(message) count += 1 # We have reached the required size. The controller might have @@ -457,11 +412,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # can reset the 'start' event if count == size.value: pause.wait() - break - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again time.sleep(0.1) consumer.stop() @@ -507,7 +461,7 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this + self.queue = MPQueue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -589,8 +543,8 @@ class MultiProcessConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified time (in seconds) + until count messages is fetched. If None, it will block forever. """ messages = [] @@ -601,7 +555,10 @@ class MultiProcessConsumer(Consumer): self.size.value = count self.pause.clear() - while count > 0: + if timeout: + max_time = time.time() + timeout + + while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of @@ -621,6 +578,7 @@ class MultiProcessConsumer(Consumer): self.count_since_commit += 1 self._auto_commit() count -= 1 + timeout = max_time - time.time() self.size.value = 0 self.start.clear() |