summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar <omar.ghishan@rd.io>2014-01-13 13:52:02 -0800
committerOmar <omar.ghishan@rd.io>2014-01-13 13:52:02 -0800
commit87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (patch)
treec55c3c5fea1fab6eef77f5213909ed2c2f8acc92 /kafka/consumer.py
parent354fcdbdd9b34b3454b964e6dc0d4a746744bbcd (diff)
parenta0c7141e2cc7399a9472a8169ea5f730f0407386 (diff)
downloadkafka-python-87c7f9dedfc008e3fff7a010cc4e708eeec5bebe.tar.gz
Merge pull request #88 from rdiomar/rdiomar_changes
Various changes/fixes, including: * Allow customizing socket timeouts * Read the correct number of bytes from kafka * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn * SimpleConsumer flow changes * Fix some error handling * Add optional upper limit to consumer fetch buffer size * Add and fix unit and integration tests
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py269
1 files changed, 140 insertions, 129 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 226700e..eba2912 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -5,8 +5,8 @@ from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -24,6 +24,11 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
class FetchContext(object):
@@ -34,13 +39,15 @@ class FetchContext(object):
self.consumer = consumer
self.block = block
- if block and not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
-
- self.timeout = timeout * 1000
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
@@ -48,9 +55,9 @@ class FetchContext(object):
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
- """Reset values to default"""
- self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class Consumer(object):
@@ -206,8 +213,14 @@ class SimpleConsumer(Consumer):
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
-
fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -218,12 +231,23 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
-
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
+
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
super(SimpleConsumer, self).__init__(
client, group, topic,
@@ -267,12 +291,6 @@ class SimpleConsumer(Consumer):
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
-
- # The API returns back the next available offset
- # For eg: if the current offset is 18, the API will return
- # back 19. So, if we have to seek 5 points before, we will
- # end up going back to 14, instead of 13. Adjust this
- deltas[partition] -= 1
else:
pass
@@ -289,123 +307,111 @@ class SimpleConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
if timeout:
- timeout = timeout * 1.0 / len(self.offsets)
+ max_time = time.time() + timeout
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ while count > 0 and (timeout is None or timeout > 0):
+ message = self.get_message(block, timeout)
+ if message:
+ messages.append(message)
count -= 1
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
return messages
- def __iter__(self):
- """
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
- """
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
+ def get_message(self, block=True, timeout=0.1):
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ return self.queue.get_nowait()
+ except Empty:
+ return None
- if len(iters) == 0:
- return
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
while True:
- if len(iters) == 0:
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+ else:
+ # Timed out waiting for a message
break
- for partition, it in iters.items():
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ requests = []
+ partitions = self.offsets.keys()
+ while partitions:
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition,
+ self.offsets[partition],
+ self.buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = set()
+ for resp in responses:
+ partition = resp.partition
try:
- if self.partition_info:
- yield (partition, it.next())
+ for message in resp.messages:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ # Put the message in our queue
+ if self.partition_info:
+ self.queue.put((partition, message))
+ else:
+ self.queue.put(message)
+ except ConsumerFetchSizeTooSmall, e:
+ if (self.max_buffer_size is not None and
+ self.buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise e
+ if self.max_buffer_size is None:
+ self.buffer_size *= 2
else:
- yield it.next()
+ self.buffer_size = max(self.buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", self.buffer_size)
+ retry_partitions.add(partition)
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
except StopIteration:
+ # Stop iterating through this partition
log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
-
- # skip auto-commit since we didn't yield anything
- continue
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
-
- fetch_size = self.fetch_min_bytes
-
- while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
-
- next_offset = None
- try:
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
- except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
- except ConsumerNoMoreData, e:
- log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
- else:
- offset = next_offset + 1
-
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
@@ -443,8 +449,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -454,12 +461,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
- time.sleep(0.1)
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
@@ -504,7 +510,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -586,8 +592,9 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
@@ -598,7 +605,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout:
+ max_time = time.time() + timeout
+
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -618,6 +628,7 @@ class MultiProcessConsumer(Consumer):
self.count_since_commit += 1
self._auto_commit()
count -= 1
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()