diff options
author | Omar <omar.ghishan@rd.io> | 2014-01-13 13:52:02 -0800 |
---|---|---|
committer | Omar <omar.ghishan@rd.io> | 2014-01-13 13:52:02 -0800 |
commit | 87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (patch) | |
tree | c55c3c5fea1fab6eef77f5213909ed2c2f8acc92 /kafka/consumer.py | |
parent | 354fcdbdd9b34b3454b964e6dc0d4a746744bbcd (diff) | |
parent | a0c7141e2cc7399a9472a8169ea5f730f0407386 (diff) | |
download | kafka-python-87c7f9dedfc008e3fff7a010cc4e708eeec5bebe.tar.gz |
Merge pull request #88 from rdiomar/rdiomar_changes
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 269 |
1 files changed, 140 insertions, 129 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 226700e..eba2912 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -5,8 +5,8 @@ from itertools import izip_longest, repeat import logging import time from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, @@ -24,6 +24,11 @@ AUTO_COMMIT_INTERVAL = 5000 FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 + +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 class FetchContext(object): @@ -34,13 +39,15 @@ class FetchContext(object): self.consumer = consumer self.block = block - if block and not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - - self.timeout = timeout * 1000 + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 def __enter__(self): """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes if self.block: self.consumer.fetch_max_wait_time = self.timeout self.consumer.fetch_min_bytes = 1 @@ -48,9 +55,9 @@ class FetchContext(object): self.consumer.fetch_min_bytes = 0 def __exit__(self, type, value, traceback): - """Reset values to default""" - self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes class Consumer(object): @@ -206,8 +213,14 @@ class SimpleConsumer(Consumer): before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -218,12 +231,23 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): - + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): + + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) + self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.iter_timeout = iter_timeout + self.queue = Queue() super(SimpleConsumer, self).__init__( client, group, topic, @@ -267,12 +291,6 @@ class SimpleConsumer(Consumer): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - # The API returns back the next available offset - # For eg: if the current offset is 18, the API will return - # back 19. So, if we have to seek 5 points before, we will - # end up going back to 14, instead of 13. Adjust this - deltas[partition] -= 1 else: pass @@ -289,123 +307,111 @@ class SimpleConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] - iterator = self.__iter__() - - # HACK: This splits the timeout between available partitions if timeout: - timeout = timeout * 1.0 / len(self.offsets) + max_time = time.time() + timeout - with FetchContext(self, block, timeout): - while count > 0: - try: - messages.append(next(iterator)) - except StopIteration: - break + while count > 0 and (timeout is None or timeout > 0): + message = self.get_message(block, timeout) + if message: + messages.append(message) count -= 1 + else: + # Ran out of messages for the last request. + if not block: + # If we're not blocking, break. + break + if timeout: + # If we're blocking and have a timeout, reduce it to the + # appropriate value + timeout = max_time - time.time() return messages - def __iter__(self): - """ - Create an iterate per partition. Iterate through them calling next() - until they are all exhausted. - """ - iters = {} - for partition, offset in self.offsets.items(): - iters[partition] = self.__iter_partition__(partition, offset) + def get_message(self, block=True, timeout=0.1): + if self.queue.empty(): + # We're out of messages, go grab some more. + with FetchContext(self, block, timeout): + self._fetch() + try: + return self.queue.get_nowait() + except Empty: + return None - if len(iters) == 0: - return + def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout while True: - if len(iters) == 0: + message = self.get_message(True, timeout) + if message: + yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + else: + # Timed out waiting for a message break - for partition, it in iters.items(): + def _fetch(self): + # Create fetch request payloads for all the partitions + requests = [] + partitions = self.offsets.keys() + while partitions: + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = set() + for resp in responses: + partition = resp.partition try: - if self.partition_info: - yield (partition, it.next()) + for message in resp.messages: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + # Put the message in our queue + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) + except ConsumerFetchSizeTooSmall, e: + if (self.max_buffer_size is not None and + self.buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise e + if self.max_buffer_size is None: + self.buffer_size *= 2 else: - yield it.next() + self.buffer_size = max(self.buffer_size * 2, + self.max_buffer_size) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", self.buffer_size) + retry_partitions.add(partition) + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) except StopIteration: + # Stop iterating through this partition log.debug("Done iterating over partition %s" % partition) - del iters[partition] - - # skip auto-commit since we didn't yield anything - continue - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - def __iter_partition__(self, partition, offset): - """ - Iterate over the messages in a partition. Create a FetchRequest - to get back a batch of messages, yield them one at a time. - After a batch is exhausted, start a new batch unless we've reached - the end of this partition. - """ - - # The offset that is stored in the consumer is the offset that - # we have consumed. In subsequent iterations, we are supposed to - # fetch the next message (that is from the next offset) - # However, for the 0th message, the offset should be as-is. - # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is - # problematic, since 0 is offset of a message which we have not yet - # consumed. - if self.fetch_started[partition]: - offset += 1 - - fetch_size = self.fetch_min_bytes - - while True: - # use MaxBytes = client's bufsize since we're only - # fetching one topic + partition - req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) - - (resp,) = self.client.send_fetch_request( - [req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) - - assert resp.topic == self.topic - assert resp.partition == partition - - next_offset = None - try: - for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This - # is so that the consumer state is not lost in certain - # cases. - # - # For eg: the message is yielded and consumed by the - # caller, but the caller does not come back into the - # generator again. The message will be consumed but the - # status will not be updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message - except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 - log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) - continue - except ConsumerNoMoreData, e: - log.debug("Iteration was ended by %r", e) - - if next_offset is None: - break - else: - offset = next_offset + 1 - + partitions = retry_partitions def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ @@ -443,8 +449,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # indicates a specific number of messages, follow that advice count = 0 - for partition, message in consumer: - queue.put((partition, message)) + message = consumer.get_message() + if message: + queue.put(message) count += 1 # We have reached the required size. The controller might have @@ -454,12 +461,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # can reset the 'start' event if count == size.value: pause.wait() - break - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: - time.sleep(0.1) + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) consumer.stop() @@ -504,7 +510,7 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this + self.queue = MPQueue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -586,8 +592,9 @@ class MultiProcessConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] @@ -598,7 +605,10 @@ class MultiProcessConsumer(Consumer): self.size.value = count self.pause.clear() - while count > 0: + if timeout: + max_time = time.time() + timeout + + while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of @@ -618,6 +628,7 @@ class MultiProcessConsumer(Consumer): self.count_since_commit += 1 self._auto_commit() count -= 1 + timeout = max_time - time.time() self.size.value = 0 self.start.clear() |