summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py337
1 files changed, 195 insertions, 142 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f2898ad..28b53ec 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,10 +1,11 @@
-from collections import defaultdict
+from __future__ import absolute_import
+
from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -22,6 +23,11 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
class FetchContext(object):
@@ -32,13 +38,15 @@ class FetchContext(object):
self.consumer = consumer
self.block = block
- if block and not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
-
- self.timeout = timeout * 1000
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
@@ -46,9 +54,9 @@ class FetchContext(object):
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
- """Reset values to default"""
- self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class Consumer(object):
@@ -67,7 +75,7 @@ class Consumer(object):
self.client = client
self.topic = topic
self.group = group
- self.client._load_metadata_for_topics(topic)
+ self.client.load_metadata_for_topics(topic)
self.offsets = {}
if not partitions:
@@ -204,8 +212,14 @@ class SimpleConsumer(Consumer):
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
-
fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -216,13 +230,10 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
-
- self.partition_info = False # Do not return partition info in msgs
- self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = fetch_size_bytes
- self.fetch_started = defaultdict(bool) # defaults to false
-
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -230,6 +241,23 @@ class SimpleConsumer(Consumer):
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = fetch_size_bytes
+ self.fetch_offsets = self.offsets.copy()
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
+
+ def __repr__(self):
+ return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
+ (self.group, self.topic, str(self.offsets.keys()))
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -265,12 +293,6 @@ class SimpleConsumer(Consumer):
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
-
- # The API returns back the next available offset
- # For eg: if the current offset is 18, the API will return
- # back 19. So, if we have to seek 5 points before, we will
- # end up going back to 14, instead of 13. Adjust this
- deltas[partition] -= 1
else:
pass
@@ -281,128 +303,148 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ self.queue = Queue()
+
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
- timeout = timeout * 1.0 / len(self.offsets)
-
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
count -= 1
-
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout is not None:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
+
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
return messages
- def __iter__(self):
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
"""
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
+ If no messages can be fetched, returns None.
+ If get_partition_info is None, it defaults to self.partition_info
+ If get_partition_info is True, returns (partition, message)
+ If get_partition_info is False, returns message
"""
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
-
- if len(iters) == 0:
- return
-
- while True:
- if len(iters) == 0:
- break
-
- for partition, it in iters.items():
- try:
- if self.partition_info:
- yield (partition, it.next())
- else:
- yield it.next()
- except StopIteration:
- log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ partition, message = self.queue.get_nowait()
- # skip auto-commit since we didn't yield anything
- continue
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
+ return partition, message
+ else:
+ return message
+ except Empty:
+ return None
- fetch_size = self.fetch_min_bytes
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
-
- next_offset = None
- try:
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
- except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
- except ConsumerNoMoreData, e:
- log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
else:
- offset = next_offset + 1
+ # Timed out waiting for a message
+ break
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ requests = []
+ partitions = self.fetch_offsets.keys()
+ while partitions:
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition,
+ self.fetch_offsets[partition],
+ self.buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = set()
+ for resp in responses:
+ partition = resp.partition
+ try:
+ for message in resp.messages:
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
+ except ConsumerFetchSizeTooSmall, e:
+ if (self.max_buffer_size is not None and
+ self.buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise e
+ if self.max_buffer_size is None:
+ self.buffer_size *= 2
+ else:
+ self.buffer_size = max(self.buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", self.buffer_size)
+ retry_partitions.add(partition)
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
@@ -440,8 +482,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -451,12 +494,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
- time.sleep(0.1)
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
@@ -501,7 +543,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -535,6 +577,10 @@ class MultiProcessConsumer(Consumer):
proc.start()
self.procs.append(proc)
+ def __repr__(self):
+ return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
+ (self.group, self.topic, len(self.procs))
+
def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()
@@ -568,12 +614,11 @@ class MultiProcessConsumer(Consumer):
break
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.start.clear()
- yield message
-
self.count_since_commit += 1
self._auto_commit()
+ yield message
self.start.clear()
@@ -583,8 +628,9 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
@@ -595,7 +641,11 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -609,15 +659,18 @@ class MultiProcessConsumer(Consumer):
break
messages.append(message)
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
- self.count_since_commit += 1
- self._auto_commit()
+ new_offsets[partition] = message.offset + 1
count -= 1
+ if timeout is not None:
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()
self.pause.set()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+
return messages