diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-01-31 21:06:20 -0500 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-01-31 21:06:20 -0500 |
commit | 72fdf391db112c0d7642371c8284d5dbd3b39fbd (patch) | |
tree | df7cd38b46f65d8fe532d65064bb3588ffebaa74 /kafka | |
parent | 8bcf0f0940a94ddb2ee44a6edb333ca0d8595913 (diff) | |
parent | 4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff) | |
download | kafka-python-72fdf391db112c0d7642371c8284d5dbd3b39fbd.tar.gz |
Merge branch 'master' into develop
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer.py | 109 | ||||
-rw-r--r-- | kafka/producer.py | 79 |
2 files changed, 114 insertions, 74 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 5be1bef..28b53ec 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -from collections import defaultdict from itertools import izip_longest, repeat import logging import time @@ -235,6 +234,12 @@ class SimpleConsumer(Consumer): buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) if max_buffer_size is not None and buffer_size > max_buffer_size: raise ValueError("buffer_size (%d) is greater than " @@ -245,17 +250,10 @@ class SimpleConsumer(Consumer): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes - self.fetch_started = defaultdict(bool) # defaults to false + self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout self.queue = Queue() - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) @@ -305,6 +303,10 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + self.queue = Queue() + def get_messages(self, count=1, block=True, timeout=0.1): """ Fetch the specified number of messages @@ -316,33 +318,69 @@ class SimpleConsumer(Consumer): it will block forever. """ messages = [] - if timeout: + if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): - message = self.get_message(block, timeout) - if message: - messages.append(message) + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 count -= 1 else: # Ran out of messages for the last request. if not block: # If we're not blocking, break. break - if timeout: + if timeout is not None: # If we're blocking and have a timeout, reduce it to the # appropriate value timeout = max_time - time.time() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() return messages - def get_message(self, block=True, timeout=0.1): + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): + """ + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message + """ if self.queue.empty(): # We're out of messages, go grab some more. with FetchContext(self, block, timeout): self._fetch() try: - return self.queue.get_nowait() + partition, message = self.queue.get_nowait() + + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: + return partition, message + else: + return message except Empty: return None @@ -367,11 +405,11 @@ class SimpleConsumer(Consumer): def _fetch(self): # Create fetch request payloads for all the partitions requests = [] - partitions = self.offsets.keys() + partitions = self.fetch_offsets.keys() while partitions: for partition in partitions: requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], + self.fetch_offsets[partition], self.buffer_size)) # Send request responses = self.client.send_fetch_request( @@ -384,18 +422,9 @@ class SimpleConsumer(Consumer): partition = resp.partition try: for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) - else: - self.queue.put(message) + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): @@ -585,12 +614,11 @@ class MultiProcessConsumer(Consumer): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -613,9 +641,10 @@ class MultiProcessConsumer(Consumer): self.size.value = count self.pause.clear() - if timeout: + if timeout is not None: max_time = time.time() + timeout + new_offsets = {} while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not @@ -630,16 +659,18 @@ class MultiProcessConsumer(Consumer): break messages.append(message) - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset - self.count_since_commit += 1 - self._auto_commit() + new_offsets[partition] = message.offset + 1 count -= 1 - timeout = max_time - time.time() + if timeout is not None: + timeout = max_time - time.time() self.size.value = 0 self.start.clear() self.pause.set() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages diff --git a/kafka/producer.py b/kafka/producer.py index 6b624f2..12a2934 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -8,7 +8,7 @@ from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from kafka.common import ProduceRequest +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner from kafka.protocol import create_message @@ -20,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20 STOP_ASYNC_PRODUCER = -1 -def _send_upstream(topic, queue, client, batch_time, batch_size, +def _send_upstream(queue, client, batch_time, batch_size, req_acks, ack_timeout): """ Listen on the queue for a specified number of messages or till @@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, # timeout is reached while count > 0 and timeout >= 0: try: - partition, msg = queue.get(timeout=timeout) + topic_partition, msg = queue.get(timeout=timeout) + except Empty: break # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: + if topic_partition == STOP_ASYNC_PRODUCER: stop = True break # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[partition].append(msg) + msgset[topic_partition].append(msg) # Send collected requests upstream reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(topic, partition, messages) + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) reqs.append(req) try: @@ -78,7 +81,6 @@ class Producer(object): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -119,8 +121,7 @@ class Producer(object): if self.async: self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, - args=(self.topic, - self.queue, + args=(self.queue, self.client.copy(), batch_send_every_t, batch_send_every_n, @@ -131,17 +132,18 @@ class Producer(object): self.proc.daemon = True self.proc.start() - def send_messages(self, partition, *msg): + def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests """ if self.async: for m in msg: - self.queue.put((partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] - req = ProduceRequest(self.topic, partition, messages) + req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) @@ -169,7 +171,6 @@ class SimpleProducer(Producer): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -180,27 +181,31 @@ class SimpleProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, async=False, + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client.load_metadata_for_topics(topic) - self.next_partition = cycle(client.topic_partitions[topic]) - + self.partition_cycles = {} super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send_messages(self, *msg): - partition = self.next_partition.next() - return super(SimpleProducer, self).send_messages(partition, *msg) + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + return self.partition_cycles[topic].next() + + def send_messages(self, topic, *msg): + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages(topic, partition, *msg) def __repr__(self): - return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async) + return '<SimpleProducer batch=%s>' % self.async class KeyedProducer(Producer): @@ -209,7 +214,6 @@ class KeyedProducer(Producer): Args: client - The kafka client instance - topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner async - If True, the messages are sent asynchronously via another @@ -220,29 +224,34 @@ class KeyedProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None, async=False, + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client.load_metadata_for_topics(topic) - if not partitioner: partitioner = HashedPartitioner - - self.partitioner = partitioner(client.topic_partitions[topic]) + self.partitioner_class = partitioner + self.partitioners = {} super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send(self, key, msg): - partitions = self.client.topic_partitions[self.topic] - partition = self.partitioner.partition(key, partitions) - return self.send_messages(partition, msg) + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partitioners[topic] = \ + self.partitioner_class(self.client.topic_partitions[topic]) + partitioner = self.partitioners[topic] + return partitioner.partition(key, self.client.topic_partitions[topic]) + + def send(self, topic, key, msg): + partition = self._next_partition(topic, key) + return self.send_messages(topic, partition, msg) def __repr__(self): - return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async) + return '<KeyedProducer batch=%s>' % self.async |