diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 162 |
1 files changed, 76 insertions, 86 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 7f67cf2..2b77d00 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -19,6 +19,14 @@ AUTO_COMMIT_INTERVAL = 5000 class Consumer(object): + """ + Base class to be used by other consumers. Not to be used directly + + This base class provides logic for + * initialization and fetching metadata of partitions + * Auto-commit logic + * APIs for fetching pending message count + """ def __init__(self, client, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -28,7 +36,6 @@ class Consumer(object): self.group = group self.client._load_metadata_for_topics(topic) self.offsets = {} - self.partition_info = False if not partitions: partitions = self.client.topic_partitions[topic] @@ -69,9 +76,6 @@ class Consumer(object): for partition in partitions: self.offsets[partition] = 0 - def provide_partition_info(self): - self.partition_info = True - def _timed_commit(self): """ Commit offsets as part of timer @@ -157,7 +161,8 @@ class Consumer(object): class SimpleConsumer(Consumer): """ - A simple consumer implementation that consumes all partitions for a topic + A simple consumer implementation that consumes all/specified partitions + for a topic client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique @@ -175,17 +180,25 @@ class SimpleConsumer(Consumer): reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers - """ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): + # Indicates if partition info will be returned in messages + self.partition_info = False + super(SimpleConsumer, self).__init__(client, group, topic, auto_commit, partitions, auto_commit_every_n, auto_commit_every_t) + def provide_partition_info(self): + """ + Indicates that partition info must be returned by the consumer + """ + self.partition_info = True + def stop(self): if self.commit_timer is not None: self.commit_timer.stop() @@ -229,64 +242,6 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def _timed_commit(self): - """ - Commit offsets as part of timer - """ - self.commit() - - # Once the commit is done, start the timer again - self.commit_timer.start() - - def commit(self, partitions=None): - """ - Commit offsets for this consumer - - partitions: list of partitions to commit, default is to commit - all of them - """ - - # short circuit if nothing happened - if self.count_since_commit == 0: - return - - with self.commit_lock: - reqs = [] - if not partitions: # commit all partitions - partitions = self.offsets.keys() - - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) - - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) - - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - assert resp.error == 0 - - self.count_since_commit = 0 - - def _auto_commit(self): - """ - Check if we have to commit based on number of messages and commit - """ - - # Check if we are supposed to do an auto-commit - if not self.auto_commit or self.auto_commit_every_n is None: - return - - if self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() - def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() @@ -354,7 +309,7 @@ class SimpleConsumer(Consumer): class MultiProcessConsumer(Consumer): """ A consumer implementation that consumes partitions for a topic in - parallel from multiple partitions + parallel using multiple processes client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique @@ -375,7 +330,6 @@ class MultiProcessConsumer(Consumer): reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers - """ def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, @@ -392,55 +346,70 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master self.queue = Queue() # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start + self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause + self.pause = Event() # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch partitions = self.offsets.keys() # If unspecified, start one consumer per partition + # The logic below ensures that + # * we do not cross the num_procs limit + # * we have an even distribution of partitions among processes if not partitions_per_proc: partitions_per_proc = round(len(partitions) * 1.0 / num_procs) if partitions_per_proc < num_procs * 0.5: partitions_per_proc += 1 - self.procs = [] + # The final set of chunks + chunks = map(None, *[iter(partitions)] * int(partitions_per_proc)) - for slices in map(None, *[iter(partitions)] * int(partitions_per_proc)): - proc = Process(target=_self._consume, args=(slices,)) + self.procs = [] + for chunk in chunks: + proc = Process(target=_self._consume, args=(chunk,)) proc.daemon = True proc.start() self.procs.append(proc) - # We do not need a consumer instance anymore - consumer.stop() - - def _consume(self, slices): + def _consume(self, partitions): + """ + A child process worker which consumes messages based on the + notifications given by the controller process + """ # We will start consumers without auto-commit. Auto-commit will be - # done by the master process. + # done by the master controller process. consumer = SimpleConsumer(self.client, self.group, self.topic, - partitions=slices, + partitions=partitions, auto_commit=False, - auto_commit_every_n=0, + auto_commit_every_n=None, auto_commit_every_t=None) # Ensure that the consumer provides the partition information consumer.provide_partition_info() while True: + # Wait till the controller indicates us to start consumption self.start.wait() + + # If we are asked to quit, do so if self.exit.isSet(): break + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice count = 0 + for partition, message in consumer: self.queue.put((partition, message)) count += 1 - # We have reached the required size. The master might have - # more than what he needs. Wait for a while + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event if count == self.size.value: self.pause.wait() break @@ -450,6 +419,7 @@ class MultiProcessConsumer(Consumer): def stop(self): # Set exit and start off all waiting consumers self.exit.set() + self.pause.set() self.start.set() for proc in self.procs: @@ -457,13 +427,23 @@ class MultiProcessConsumer(Consumer): proc.terminate() def __iter__(self): - # Trigger the consumer procs to start off + """ + Iterator to consume the messages available on this consumer + """ + # Trigger the consumer procs to start off. + # We will iterate till there are no more messages available self.size.value = 0 self.start.set() self.pause.set() - while not self.queue.empty(): - partition, message = self.queue.get() + while True: + try: + # We will block for a small while so that the consumers get + # a chance to run and put some messages in the queue + partition, message = self.queue.get(block=True, timeout=0.1) + except Queue.Empty: + break + yield message # Count, check and commit messages if necessary @@ -474,9 +454,20 @@ class MultiProcessConsumer(Consumer): self.start.clear() def get_messages(self, count=1, block=True, timeout=10): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ messages = [] - # Give a size hint to the consumers + # Give a size hint to the consumers. Each consumer process will fetch + # a maximum of "count" messages. This will fetch more messages than + # necessary, but these will not be committed to kafka. Also, the extra + # messages can be provided in subsequent runs self.size.value = count self.pause.clear() @@ -484,7 +475,7 @@ class MultiProcessConsumer(Consumer): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of - # messages when the user might need only two + # messages when the user might need only a few if self.queue.empty(): self.start.set() @@ -502,7 +493,6 @@ class MultiProcessConsumer(Consumer): count -= 1 self.size.value = 0 - self.start.clear() self.pause.set() |