diff options
| author | Omar <omar.ghishan@rd.io> | 2014-01-13 13:52:02 -0800 | 
|---|---|---|
| committer | Omar <omar.ghishan@rd.io> | 2014-01-13 13:52:02 -0800 | 
| commit | 87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (patch) | |
| tree | c55c3c5fea1fab6eef77f5213909ed2c2f8acc92 /kafka/consumer.py | |
| parent | 354fcdbdd9b34b3454b964e6dc0d4a746744bbcd (diff) | |
| parent | a0c7141e2cc7399a9472a8169ea5f730f0407386 (diff) | |
| download | kafka-python-87c7f9dedfc008e3fff7a010cc4e708eeec5bebe.tar.gz | |
Merge pull request #88 from rdiomar/rdiomar_changes
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
Diffstat (limited to 'kafka/consumer.py')
| -rw-r--r-- | kafka/consumer.py | 269 | 
1 files changed, 140 insertions, 129 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 226700e..eba2912 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -5,8 +5,8 @@ from itertools import izip_longest, repeat  import logging  import time  from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue  from kafka.common import (      ErrorMapping, FetchRequest, @@ -24,6 +24,11 @@ AUTO_COMMIT_INTERVAL = 5000  FETCH_DEFAULT_BLOCK_TIMEOUT = 1  FETCH_MAX_WAIT_TIME = 100  FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 + +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1  class FetchContext(object): @@ -34,13 +39,15 @@ class FetchContext(object):          self.consumer = consumer          self.block = block -        if block and not timeout: -            timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - -        self.timeout = timeout * 1000 +        if block: +            if not timeout: +                timeout = FETCH_DEFAULT_BLOCK_TIMEOUT +            self.timeout = timeout * 1000      def __enter__(self):          """Set fetch values based on blocking status""" +        self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time +        self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes          if self.block:              self.consumer.fetch_max_wait_time = self.timeout              self.consumer.fetch_min_bytes = 1 @@ -48,9 +55,9 @@ class FetchContext(object):              self.consumer.fetch_min_bytes = 0      def __exit__(self, type, value, traceback): -        """Reset values to default""" -        self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME -        self.consumer.fetch_min_bytes = FETCH_MIN_BYTES +        """Reset values""" +        self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time +        self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes  class Consumer(object): @@ -206,8 +213,14 @@ class SimpleConsumer(Consumer):                           before a commit      auto_commit_every_t: default 5000. How much time (in milliseconds) to                           wait before commit -      fetch_size_bytes:    number of bytes to request in a FetchRequest +    buffer_size:         default 4K. Initial number of bytes to tell kafka we +                         have available. This will double as needed. +    max_buffer_size:     default 16K. Max number of bytes to tell kafka we have +                         available. None means no limit. +    iter_timeout:        default None. How much time (in seconds) to wait for a +                         message in the iterator before exiting. None means no +                         timeout, so it will wait forever.      Auto commit details:      If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -218,12 +231,23 @@ class SimpleConsumer(Consumer):      def __init__(self, client, group, topic, auto_commit=True, partitions=None,                   auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,                   auto_commit_every_t=AUTO_COMMIT_INTERVAL, -                 fetch_size_bytes=FETCH_MIN_BYTES): - +                 fetch_size_bytes=FETCH_MIN_BYTES, +                 buffer_size=FETCH_BUFFER_SIZE_BYTES, +                 max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, +                 iter_timeout=None): + +        if max_buffer_size is not None and buffer_size > max_buffer_size: +            raise ValueError("buffer_size (%d) is greater than " +                             "max_buffer_size (%d)" % +                             (buffer_size, max_buffer_size)) +        self.buffer_size = buffer_size +        self.max_buffer_size = max_buffer_size          self.partition_info = False     # Do not return partition info in msgs          self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME          self.fetch_min_bytes = fetch_size_bytes          self.fetch_started = defaultdict(bool)  # defaults to false +        self.iter_timeout = iter_timeout +        self.queue = Queue()          super(SimpleConsumer, self).__init__(              client, group, topic, @@ -267,12 +291,6 @@ class SimpleConsumer(Consumer):                      reqs.append(OffsetRequest(self.topic, partition, -2, 1))                  elif whence == 2:                      reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - -                    # The API returns back the next available offset -                    # For eg: if the current offset is 18, the API will return -                    # back 19. So, if we have to seek 5 points before, we will -                    # end up going back to 14, instead of 13. Adjust this -                    deltas[partition] -= 1                  else:                      pass @@ -289,123 +307,111 @@ class SimpleConsumer(Consumer):          count: Indicates the maximum number of messages to be fetched          block: If True, the API will block till some messages are fetched. -        timeout: If None, and block=True, the API will block infinitely. -                 If >0, API will block for specified time (in seconds) +        timeout: If block is True, the function will block for the specified +                 time (in seconds) until count messages is fetched. If None, +                 it will block forever.          """          messages = [] -        iterator = self.__iter__() - -        # HACK: This splits the timeout between available partitions          if timeout: -            timeout = timeout * 1.0 / len(self.offsets) +            max_time = time.time() + timeout -        with FetchContext(self, block, timeout): -            while count > 0: -                try: -                    messages.append(next(iterator)) -                except StopIteration: -                    break +        while count > 0 and (timeout is None or timeout > 0): +            message = self.get_message(block, timeout) +            if message: +                messages.append(message)                  count -= 1 +            else: +                # Ran out of messages for the last request. +                if not block: +                    # If we're not blocking, break. +                    break +                if timeout: +                    # If we're blocking and have a timeout, reduce it to the +                    # appropriate value +                    timeout = max_time - time.time()          return messages -    def __iter__(self): -        """ -        Create an iterate per partition. Iterate through them calling next() -        until they are all exhausted. -        """ -        iters = {} -        for partition, offset in self.offsets.items(): -            iters[partition] = self.__iter_partition__(partition, offset) +    def get_message(self, block=True, timeout=0.1): +        if self.queue.empty(): +            # We're out of messages, go grab some more. +            with FetchContext(self, block, timeout): +                self._fetch() +        try: +            return self.queue.get_nowait() +        except Empty: +            return None -        if len(iters) == 0: -            return +    def __iter__(self): +        if self.iter_timeout is None: +            timeout = ITER_TIMEOUT_SECONDS +        else: +            timeout = self.iter_timeout          while True: -            if len(iters) == 0: +            message = self.get_message(True, timeout) +            if message: +                yield message +            elif self.iter_timeout is None: +                # We did not receive any message yet but we don't have a +                # timeout, so give up the CPU for a while before trying again +                time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) +            else: +                # Timed out waiting for a message                  break -            for partition, it in iters.items(): +    def _fetch(self): +        # Create fetch request payloads for all the partitions +        requests = [] +        partitions = self.offsets.keys() +        while partitions: +            for partition in partitions: +                requests.append(FetchRequest(self.topic, partition, +                                             self.offsets[partition], +                                             self.buffer_size)) +            # Send request +            responses = self.client.send_fetch_request( +                requests, +                max_wait_time=int(self.fetch_max_wait_time), +                min_bytes=self.fetch_min_bytes) + +            retry_partitions = set() +            for resp in responses: +                partition = resp.partition                  try: -                    if self.partition_info: -                        yield (partition, it.next()) +                    for message in resp.messages: +                        # Update partition offset +                        self.offsets[partition] = message.offset + 1 + +                        # Count, check and commit messages if necessary +                        self.count_since_commit += 1 +                        self._auto_commit() + +                        # Put the message in our queue +                        if self.partition_info: +                            self.queue.put((partition, message)) +                        else: +                            self.queue.put(message) +                except ConsumerFetchSizeTooSmall, e: +                    if (self.max_buffer_size is not None and +                            self.buffer_size == self.max_buffer_size): +                        log.error("Max fetch size %d too small", +                                  self.max_buffer_size) +                        raise e +                    if self.max_buffer_size is None: +                        self.buffer_size *= 2                      else: -                        yield it.next() +                        self.buffer_size = max(self.buffer_size * 2, +                                               self.max_buffer_size) +                    log.warn("Fetch size too small, increase to %d (2x) " +                             "and retry", self.buffer_size) +                    retry_partitions.add(partition) +                except ConsumerNoMoreData, e: +                    log.debug("Iteration was ended by %r", e)                  except StopIteration: +                    # Stop iterating through this partition                      log.debug("Done iterating over partition %s" % partition) -                    del iters[partition] - -                    # skip auto-commit since we didn't yield anything -                    continue - -                # Count, check and commit messages if necessary -                self.count_since_commit += 1 -                self._auto_commit() - -    def __iter_partition__(self, partition, offset): -        """ -        Iterate over the messages in a partition. Create a FetchRequest -        to get back a batch of messages, yield them one at a time. -        After a batch is exhausted, start a new batch unless we've reached -        the end of this partition. -        """ - -        # The offset that is stored in the consumer is the offset that -        # we have consumed. In subsequent iterations, we are supposed to -        # fetch the next message (that is from the next offset) -        # However, for the 0th message, the offset should be as-is. -        # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is -        # problematic, since 0 is offset of a message which we have not yet -        # consumed. -        if self.fetch_started[partition]: -            offset += 1 - -        fetch_size = self.fetch_min_bytes - -        while True: -            # use MaxBytes = client's bufsize since we're only -            # fetching one topic + partition -            req = FetchRequest( -                self.topic, partition, offset, self.client.bufsize) - -            (resp,) = self.client.send_fetch_request( -                [req], -                max_wait_time=self.fetch_max_wait_time, -                min_bytes=fetch_size) - -            assert resp.topic == self.topic -            assert resp.partition == partition - -            next_offset = None -            try: -                for message in resp.messages: -                    next_offset = message.offset - -                    # update the offset before the message is yielded. This -                    # is so that the consumer state is not lost in certain -                    # cases. -                    # -                    # For eg: the message is yielded and consumed by the -                    # caller, but the caller does not come back into the -                    # generator again. The message will be consumed but the -                    # status will not be updated in the consumer -                    self.fetch_started[partition] = True -                    self.offsets[partition] = message.offset -                    yield message -            except ConsumerFetchSizeTooSmall, e: -                fetch_size *= 1.5 -                log.warn( -                    "Fetch size too small, increasing to %d (1.5x) and retry", -                    fetch_size) -                continue -            except ConsumerNoMoreData, e: -                log.debug("Iteration was ended by %r", e) - -            if next_offset is None: -                break -            else: -                offset = next_offset + 1 - +                partitions = retry_partitions  def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):      """ @@ -443,8 +449,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):          # indicates a specific number of messages, follow that advice          count = 0 -        for partition, message in consumer: -            queue.put((partition, message)) +        message = consumer.get_message() +        if message: +            queue.put(message)              count += 1              # We have reached the required size. The controller might have @@ -454,12 +461,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):              # can reset the 'start' event              if count == size.value:                  pause.wait() -                break -        # In case we did not receive any message, give up the CPU for -        # a while before we try again -        if count == 0: -            time.sleep(0.1) +        else: +            # In case we did not receive any message, give up the CPU for +            # a while before we try again +            time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)      consumer.stop() @@ -504,7 +510,7 @@ class MultiProcessConsumer(Consumer):          # Variables for managing and controlling the data flow from          # consumer child process to master -        self.queue = Queue(1024)    # Child consumers dump messages into this +        self.queue = MPQueue(1024)  # Child consumers dump messages into this          self.start = Event()        # Indicates the consumers to start fetch          self.exit = Event()         # Requests the consumers to shutdown          self.pause = Event()        # Requests the consumers to pause fetch @@ -586,8 +592,9 @@ class MultiProcessConsumer(Consumer):          count: Indicates the maximum number of messages to be fetched          block: If True, the API will block till some messages are fetched. -        timeout: If None, and block=True, the API will block infinitely. -                 If >0, API will block for specified time (in seconds) +        timeout: If block is True, the function will block for the specified +                 time (in seconds) until count messages is fetched. If None, +                 it will block forever.          """          messages = [] @@ -598,7 +605,10 @@ class MultiProcessConsumer(Consumer):          self.size.value = count          self.pause.clear() -        while count > 0: +        if timeout: +            max_time = time.time() + timeout + +        while count > 0 and (timeout is None or timeout > 0):              # Trigger consumption only if the queue is empty              # By doing this, we will ensure that consumers do not              # go into overdrive and keep consuming thousands of @@ -618,6 +628,7 @@ class MultiProcessConsumer(Consumer):              self.count_since_commit += 1              self._auto_commit()              count -= 1 +            timeout = max_time - time.time()          self.size.value = 0          self.start.clear()  | 
