summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-25 11:36:34 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-25 11:36:34 +0530
commitc7dfebaa8d9ee6bc6af8f1d03557ed1e089a0f65 (patch)
tree6e40591778a622e9e51033e563a533d20331db12
parente85190e6f6d8ea554df96b2b445924591bd561b6 (diff)
downloadkafka-python-c7dfebaa8d9ee6bc6af8f1d03557ed1e089a0f65.tar.gz
Added more documentation and clean up duplicate code
-rw-r--r--kafka/consumer.py162
1 files changed, 76 insertions, 86 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 7f67cf2..2b77d00 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -19,6 +19,14 @@ AUTO_COMMIT_INTERVAL = 5000
class Consumer(object):
+ """
+ Base class to be used by other consumers. Not to be used directly
+
+ This base class provides logic for
+ * initialization and fetching metadata of partitions
+ * Auto-commit logic
+ * APIs for fetching pending message count
+ """
def __init__(self, client, topic, partitions=None, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
@@ -28,7 +36,6 @@ class Consumer(object):
self.group = group
self.client._load_metadata_for_topics(topic)
self.offsets = {}
- self.partition_info = False
if not partitions:
partitions = self.client.topic_partitions[topic]
@@ -69,9 +76,6 @@ class Consumer(object):
for partition in partitions:
self.offsets[partition] = 0
- def provide_partition_info(self):
- self.partition_info = True
-
def _timed_commit(self):
"""
Commit offsets as part of timer
@@ -157,7 +161,8 @@ class Consumer(object):
class SimpleConsumer(Consumer):
"""
- A simple consumer implementation that consumes all partitions for a topic
+ A simple consumer implementation that consumes all/specified partitions
+ for a topic
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
@@ -175,17 +180,25 @@ class SimpleConsumer(Consumer):
reset one another when one is triggered. These triggers simply call the
commit method on this class. A manual call to commit will also reset
these triggers
-
"""
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
+ # Indicates if partition info will be returned in messages
+ self.partition_info = False
+
super(SimpleConsumer, self).__init__(client, group, topic,
auto_commit, partitions,
auto_commit_every_n,
auto_commit_every_t)
+ def provide_partition_info(self):
+ """
+ Indicates that partition info must be returned by the consumer
+ """
+ self.partition_info = True
+
def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
@@ -229,64 +242,6 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
- def _timed_commit(self):
- """
- Commit offsets as part of timer
- """
- self.commit()
-
- # Once the commit is done, start the timer again
- self.commit_timer.start()
-
- def commit(self, partitions=None):
- """
- Commit offsets for this consumer
-
- partitions: list of partitions to commit, default is to commit
- all of them
- """
-
- # short circuit if nothing happened
- if self.count_since_commit == 0:
- return
-
- with self.commit_lock:
- reqs = []
- if not partitions: # commit all partitions
- partitions = self.offsets.keys()
-
- for partition in partitions:
- offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: "
- "group=%s, topic=%s, partition=%s" %
- (offset, self.group, self.topic, partition))
-
- reqs.append(OffsetCommitRequest(self.topic, partition,
- offset, None))
-
- resps = self.client.send_offset_commit_request(self.group, reqs)
- for resp in resps:
- assert resp.error == 0
-
- self.count_since_commit = 0
-
- def _auto_commit(self):
- """
- Check if we have to commit based on number of messages and commit
- """
-
- # Check if we are supposed to do an auto-commit
- if not self.auto_commit or self.auto_commit_every_n is None:
- return
-
- if self.count_since_commit > self.auto_commit_every_n:
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
- self.commit_timer.start()
- else:
- self.commit()
-
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next()
@@ -354,7 +309,7 @@ class SimpleConsumer(Consumer):
class MultiProcessConsumer(Consumer):
"""
A consumer implementation that consumes partitions for a topic in
- parallel from multiple partitions
+ parallel using multiple processes
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
@@ -375,7 +330,6 @@ class MultiProcessConsumer(Consumer):
reset one another when one is triggered. These triggers simply call the
commit method on this class. A manual call to commit will also reset
these triggers
-
"""
def __init__(self, client, group, topic, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
@@ -392,55 +346,70 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
self.queue = Queue() # Child consumers dump messages into this
- self.start = Event() # Indicates the consumers to start
+ self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
- self.pause = Event() # Requests the consumers to pause
+ self.pause = Event() # Requests the consumers to pause fetch
self.size = Value('i', 0) # Indicator of number of messages to fetch
partitions = self.offsets.keys()
# If unspecified, start one consumer per partition
+ # The logic below ensures that
+ # * we do not cross the num_procs limit
+ # * we have an even distribution of partitions among processes
if not partitions_per_proc:
partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
if partitions_per_proc < num_procs * 0.5:
partitions_per_proc += 1
- self.procs = []
+ # The final set of chunks
+ chunks = map(None, *[iter(partitions)] * int(partitions_per_proc))
- for slices in map(None, *[iter(partitions)] * int(partitions_per_proc)):
- proc = Process(target=_self._consume, args=(slices,))
+ self.procs = []
+ for chunk in chunks:
+ proc = Process(target=_self._consume, args=(chunk,))
proc.daemon = True
proc.start()
self.procs.append(proc)
- # We do not need a consumer instance anymore
- consumer.stop()
-
- def _consume(self, slices):
+ def _consume(self, partitions):
+ """
+ A child process worker which consumes messages based on the
+ notifications given by the controller process
+ """
# We will start consumers without auto-commit. Auto-commit will be
- # done by the master process.
+ # done by the master controller process.
consumer = SimpleConsumer(self.client, self.group, self.topic,
- partitions=slices,
+ partitions=partitions,
auto_commit=False,
- auto_commit_every_n=0,
+ auto_commit_every_n=None,
auto_commit_every_t=None)
# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
while True:
+ # Wait till the controller indicates us to start consumption
self.start.wait()
+
+ # If we are asked to quit, do so
if self.exit.isSet():
break
+ # Consume messages and add them to the queue. If the controller
+ # indicates a specific number of messages, follow that advice
count = 0
+
for partition, message in consumer:
self.queue.put((partition, message))
count += 1
- # We have reached the required size. The master might have
- # more than what he needs. Wait for a while
+ # We have reached the required size. The controller might have
+ # more than what he needs. Wait for a while.
+ # Without this logic, it is possible that we run into a big
+ # loop consuming all available messages before the controller
+ # can reset the 'start' event
if count == self.size.value:
self.pause.wait()
break
@@ -450,6 +419,7 @@ class MultiProcessConsumer(Consumer):
def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()
+ self.pause.set()
self.start.set()
for proc in self.procs:
@@ -457,13 +427,23 @@ class MultiProcessConsumer(Consumer):
proc.terminate()
def __iter__(self):
- # Trigger the consumer procs to start off
+ """
+ Iterator to consume the messages available on this consumer
+ """
+ # Trigger the consumer procs to start off.
+ # We will iterate till there are no more messages available
self.size.value = 0
self.start.set()
self.pause.set()
- while not self.queue.empty():
- partition, message = self.queue.get()
+ while True:
+ try:
+ # We will block for a small while so that the consumers get
+ # a chance to run and put some messages in the queue
+ partition, message = self.queue.get(block=True, timeout=0.1)
+ except Queue.Empty:
+ break
+
yield message
# Count, check and commit messages if necessary
@@ -474,9 +454,20 @@ class MultiProcessConsumer(Consumer):
self.start.clear()
def get_messages(self, count=1, block=True, timeout=10):
+ """
+ Fetch the specified number of messages
+
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If None, and block=True, the API will block infinitely.
+ If >0, API will block for specified time (in seconds)
+ """
messages = []
- # Give a size hint to the consumers
+ # Give a size hint to the consumers. Each consumer process will fetch
+ # a maximum of "count" messages. This will fetch more messages than
+ # necessary, but these will not be committed to kafka. Also, the extra
+ # messages can be provided in subsequent runs
self.size.value = count
self.pause.clear()
@@ -484,7 +475,7 @@ class MultiProcessConsumer(Consumer):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
- # messages when the user might need only two
+ # messages when the user might need only a few
if self.queue.empty():
self.start.set()
@@ -502,7 +493,6 @@ class MultiProcessConsumer(Consumer):
count -= 1
self.size.value = 0
-
self.start.clear()
self.pause.set()