summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py182
1 files changed, 70 insertions, 112 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 600c8c7..a5a3e26 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -3,8 +3,8 @@ from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -227,6 +227,7 @@ class SimpleConsumer(Consumer):
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
+ self.queue = Queue(buffer_size)
super(SimpleConsumer, self).__init__(
client, group, topic,
@@ -292,122 +293,75 @@ class SimpleConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified time (in seconds)
+ until count messages is fetched. If None, it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
if timeout:
- timeout = timeout * 1.0 / len(self.offsets)
+ max_time = time.time() + timeout
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ while count > 0 and (timeout is None or timeout > 0):
+ message = self.get_message(block, timeout)
+ if message:
+ messages.append(message)
count -= 1
+ else:
+ # Ran out of messages for the last request. If we're not blocking, break.
+ if not block:
+ break
+ if timeout:
+ timeout = max_time - time.time()
return messages
- def __iter__(self):
- """
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
- """
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
-
- if len(iters) == 0:
- return
-
- while True:
- if len(iters) == 0:
- break
-
- for partition, it in iters.items():
- try:
- if self.partition_info:
- yield (partition, it.next())
- else:
- yield it.next()
- except StopIteration:
- log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
-
- # skip auto-commit since we didn't yield anything
- continue
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
-
- fetch_size = self.fetch_min_bytes
+ def get_message(self, block=True, timeout=0.1):
+ if self.queue.empty():
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ return self.queue.get_nowait()
+ except Empty:
+ return None
+ def __iter__(self):
while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.buffer_size)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
+ message = self.get_message(True, 100)
+ if message:
+ yield message
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(0.1)
- next_offset = None
+ def _fetch(self):
+ requests = []
+ partitions = self.offsets.keys()
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size))
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+ for resp in responses:
+ partition = resp.partition
try:
for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
+ self.offsets[partition] = message.offset + 1
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+ if self.partition_info:
+ self.queue.put((partition, message))
+ else:
+ self.queue.put(message)
except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
+ self.buffer_size *= 2
+ log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size)
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
- else:
- offset = next_offset + 1
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
@@ -446,8 +400,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -457,11 +412,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
time.sleep(0.1)
consumer.stop()
@@ -507,7 +461,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -589,8 +543,8 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified time (in seconds)
+ until count messages is fetched. If None, it will block forever.
"""
messages = []
@@ -601,7 +555,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout:
+ max_time = time.time() + timeout
+
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -621,6 +578,7 @@ class MultiProcessConsumer(Consumer):
self.count_since_commit += 1
self._auto_commit()
count -= 1
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()