diff options
-rw-r--r-- | kafka/consumer.py | 698 | ||||
-rw-r--r-- | kafka/consumer/__init__.py | 6 | ||||
-rw-r--r-- | kafka/consumer/base.py | 169 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 248 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 318 | ||||
-rw-r--r-- | kafka/partitioner.py | 58 | ||||
-rw-r--r-- | kafka/partitioner/__init__.py | 6 | ||||
-rw-r--r-- | kafka/partitioner/base.py | 23 | ||||
-rw-r--r-- | kafka/partitioner/hashed.py | 12 | ||||
-rw-r--r-- | kafka/partitioner/roundrobin.py | 23 | ||||
-rw-r--r-- | kafka/producer/__init__.py | 6 | ||||
-rw-r--r-- | kafka/producer/base.py (renamed from kafka/producer.py) | 115 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 62 | ||||
-rw-r--r-- | kafka/producer/simple.py | 73 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 2 | ||||
-rw-r--r-- | test/test_failover_integration.py | 2 | ||||
-rw-r--r-- | test/test_producer.py | 2 |
17 files changed, 950 insertions, 873 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py deleted file mode 100644 index 42628e1..0000000 --- a/kafka/consumer.py +++ /dev/null @@ -1,698 +0,0 @@ -from __future__ import absolute_import - -try: - from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 -except ImportError: # python 2 - from itertools import izip_longest as izip_longest, repeat -import logging -import time -import numbers -from threading import Lock -from multiprocessing import Process, Queue as MPQueue, Event, Value - -import six - -try: - from Queue import Empty, Queue -except ImportError: # python 2 - from queue import Empty, Queue - -import kafka.common -from kafka.common import ( - FetchRequest, OffsetRequest, - OffsetCommitRequest, OffsetFetchRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData, - UnknownTopicOrPartitionError -) - -from kafka.util import ReentrantTimer - -log = logging.getLogger("kafka") - -AUTO_COMMIT_MSG_COUNT = 100 -AUTO_COMMIT_INTERVAL = 5000 - -FETCH_DEFAULT_BLOCK_TIMEOUT = 1 -FETCH_MAX_WAIT_TIME = 100 -FETCH_MIN_BYTES = 4096 -FETCH_BUFFER_SIZE_BYTES = 4096 -MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 - -ITER_TIMEOUT_SECONDS = 60 -NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 - - -class FetchContext(object): - """ - Class for managing the state of a consumer during fetch - """ - def __init__(self, consumer, block, timeout): - self.consumer = consumer - self.block = block - - if block: - if not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - self.timeout = timeout * 1000 - - def __enter__(self): - """Set fetch values based on blocking status""" - self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time - self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes - if self.block: - self.consumer.fetch_max_wait_time = self.timeout - self.consumer.fetch_min_bytes = 1 - else: - self.consumer.fetch_min_bytes = 0 - - def __exit__(self, type, value, traceback): - """Reset values""" - self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time - self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes - - -class Consumer(object): - """ - Base class to be used by other consumers. Not to be used directly - - This base class provides logic for - * initialization and fetching metadata of partitions - * Auto-commit logic - * APIs for fetching pending message count - """ - def __init__(self, client, group, topic, partitions=None, auto_commit=True, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): - - self.client = client - self.topic = topic - self.group = group - self.client.load_metadata_for_topics(topic) - self.offsets = {} - - if not partitions: - partitions = self.client.get_partition_ids_for_topic(topic) - else: - assert all(isinstance(x, numbers.Integral) for x in partitions) - - # Variables for handling offset commits - self.commit_lock = Lock() - self.commit_timer = None - self.count_since_commit = 0 - self.auto_commit = auto_commit - self.auto_commit_every_n = auto_commit_every_n - self.auto_commit_every_t = auto_commit_every_t - - # Set up the auto-commit timer - if auto_commit is True and auto_commit_every_t is not None: - self.commit_timer = ReentrantTimer(auto_commit_every_t, - self.commit) - self.commit_timer.start() - - if auto_commit: - self.fetch_last_known_offsets(partitions) - else: - for partition in partitions: - self.offsets[partition] = 0 - - def fetch_last_known_offsets(self, partitions=None): - if not partitions: - partitions = self.client.get_partition_ids_for_topic(self.topic) - - def get_or_init_offset(resp): - try: - kafka.common.check_error(resp) - return resp.offset - except UnknownTopicOrPartitionError: - return 0 - - for partition in partitions: - req = OffsetFetchRequest(self.topic, partition) - (resp,) = self.client.send_offset_fetch_request(self.group, [req], - fail_on_error=False) - self.offsets[partition] = get_or_init_offset(resp) - self.fetch_offsets = self.offsets.copy() - - def commit(self, partitions=None): - """ - Commit offsets for this consumer - - partitions: list of partitions to commit, default is to commit - all of them - """ - - # short circuit if nothing happened. This check is kept outside - # to prevent un-necessarily acquiring a lock for checking the state - if self.count_since_commit == 0: - return - - with self.commit_lock: - # Do this check again, just in case the state has changed - # during the lock acquiring timeout - if self.count_since_commit == 0: - return - - reqs = [] - if not partitions: # commit all partitions - partitions = self.offsets.keys() - - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) - - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) - - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - kafka.common.check_error(resp) - - self.count_since_commit = 0 - - def _auto_commit(self): - """ - Check if we have to commit based on number of messages and commit - """ - - # Check if we are supposed to do an auto-commit - if not self.auto_commit or self.auto_commit_every_n is None: - return - - if self.count_since_commit >= self.auto_commit_every_n: - self.commit() - - def stop(self): - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - - def pending(self, partitions=None): - """ - Gets the pending message count - - partitions: list of partitions to check for, default is to check all - """ - if not partitions: - partitions = self.offsets.keys() - - total = 0 - reqs = [] - - for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - resps = self.client.send_offset_request(reqs) - for resp in resps: - partition = resp.partition - pending = resp.offsets[0] - offset = self.offsets[partition] - total += pending - offset - (1 if offset > 0 else 0) - - return total - - -class SimpleConsumer(Consumer): - """ - A simple consumer implementation that consumes all/specified partitions - for a topic - - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES, - max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - if max_buffer_size is not None and buffer_size > max_buffer_size: - raise ValueError("buffer_size (%d) is greater than " - "max_buffer_size (%d)" % - (buffer_size, max_buffer_size)) - self.buffer_size = buffer_size - self.max_buffer_size = max_buffer_size - self.partition_info = False # Do not return partition info in msgs - self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = fetch_size_bytes - self.fetch_offsets = self.offsets.copy() - self.iter_timeout = iter_timeout - self.queue = Queue() - - def __repr__(self): - return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ - (self.group, self.topic, str(self.offsets.keys())) - - def provide_partition_info(self): - """ - Indicates that partition info must be returned by the consumer - """ - self.partition_info = True - - def seek(self, offset, whence): - """ - Alter the current offset in the consumer, similar to fseek - - offset: how much to modify the offset - whence: where to modify it from - 0 is relative to the earliest available offset (head) - 1 is relative to the current offset - 2 is relative to the latest known offset (tail) - """ - - if whence == 1: # relative to current position - for partition, _offset in self.offsets.items(): - self.offsets[partition] = _offset + offset - elif whence in (0, 2): # relative to beginning or end - # divide the request offset by number of partitions, - # distribute the remained evenly - (delta, rem) = divmod(offset, len(self.offsets)) - deltas = {} - for partition, r in izip_longest(self.offsets.keys(), - repeat(1, rem), fillvalue=0): - deltas[partition] = delta + r - - reqs = [] - for partition in self.offsets.keys(): - if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) - elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - else: - pass - - resps = self.client.send_offset_request(reqs) - for resp in resps: - self.offsets[resp.partition] = \ - resp.offsets[0] + deltas[resp.partition] - else: - raise ValueError("Unexpected value for `whence`, %d" % whence) - - # Reset queue and fetch offsets since they are invalid - self.fetch_offsets = self.offsets.copy() - if self.auto_commit: - self.count_since_commit += 1 - self.commit() - - self.queue = Queue() - - def get_messages(self, count=1, block=True, timeout=0.1): - """ - Fetch the specified number of messages - - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. - """ - messages = [] - if timeout is not None: - max_time = time.time() + timeout - - new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - result = self._get_message(block, timeout, get_partition_info=True, - update_offset=False) - if result: - partition, message = result - if self.partition_info: - messages.append(result) - else: - messages.append(message) - new_offsets[partition] = message.offset + 1 - count -= 1 - else: - # Ran out of messages for the last request. - if not block: - # If we're not blocking, break. - break - if timeout is not None: - # If we're blocking and have a timeout, reduce it to the - # appropriate value - timeout = max_time - time.time() - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - return messages - - def get_message(self, block=True, timeout=0.1, get_partition_info=None): - return self._get_message(block, timeout, get_partition_info) - - def _get_message(self, block=True, timeout=0.1, get_partition_info=None, - update_offset=True): - """ - If no messages can be fetched, returns None. - If get_partition_info is None, it defaults to self.partition_info - If get_partition_info is True, returns (partition, message) - If get_partition_info is False, returns message - """ - if self.queue.empty(): - # We're out of messages, go grab some more. - with FetchContext(self, block, timeout): - self._fetch() - try: - partition, message = self.queue.get_nowait() - - if update_offset: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - if get_partition_info is None: - get_partition_info = self.partition_info - if get_partition_info: - return partition, message - else: - return message - except Empty: - return None - - def __iter__(self): - if self.iter_timeout is None: - timeout = ITER_TIMEOUT_SECONDS - else: - timeout = self.iter_timeout - - while True: - message = self.get_message(True, timeout) - if message: - yield message - elif self.iter_timeout is None: - # We did not receive any message yet but we don't have a - # timeout, so give up the CPU for a while before trying again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - else: - # Timed out waiting for a message - break - - def _fetch(self): - # Create fetch request payloads for all the partitions - partitions = dict((p, self.buffer_size) - for p in self.fetch_offsets.keys()) - while partitions: - requests = [] - for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) - - retry_partitions = {} - for resp in responses: - partition = resp.partition - buffer_size = partitions[partition] - try: - for message in resp.messages: - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall: - if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size): - log.error("Max fetch size %d too small", - self.max_buffer_size) - raise - if self.max_buffer_size is None: - buffer_size *= 2 - else: - buffer_size = max(buffer_size * 2, - self.max_buffer_size) - log.warn("Fetch size too small, increase to %d (2x) " - "and retry", buffer_size) - retry_partitions[partition] = buffer_size - except ConsumerNoMoreData as e: - log.debug("Iteration was ended by %r", e) - except StopIteration: - # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) - partitions = retry_partitions - -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): - """ - A child process worker which consumes messages based on the - notifications given by the controller process - - NOTE: Ideally, this should have been a method inside the Consumer - class. However, multiprocessing module has issues in windows. The - functionality breaks unless this function is kept outside of a class - """ - - # Make the child processes open separate socket connections - client.reinit() - - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(client, group, topic, - partitions=chunk, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None) - - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() - - while True: - # Wait till the controller indicates us to start consumption - start.wait() - - # If we are asked to quit, do so - if exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - message = consumer.get_message() - if message: - queue.put(message) - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == size.value: - pause.wait() - - else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - - consumer.stop() - - -class MultiProcessConsumer(Consumer): - """ - A consumer implementation that consumes partitions for a topic in - parallel using multiple processes - - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, auto_commit=True, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): - - # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__( - client, group, topic, - partitions=None, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - # Variables for managing and controlling the data flow from - # consumer child process to master - self.queue = MPQueue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch - self.size = Value('i', 0) # Indicator of number of messages to fetch - - partitions = self.offsets.keys() - - # If unspecified, start one consumer per partition - # The logic below ensures that - # * we do not cross the num_procs limit - # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: - partitions_per_proc += 1 - - # The final set of chunks - chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) - - self.procs = [] - for chunk in chunks: - chunk = filter(lambda x: x is not None, chunk) - args = (client.copy(), - group, topic, list(chunk), - self.queue, self.start, self.exit, - self.pause, self.size) - - proc = Process(target=_mp_consume, args=args) - proc.daemon = True - proc.start() - self.procs.append(proc) - - def __repr__(self): - return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \ - (self.group, self.topic, len(self.procs)) - - def stop(self): - # Set exit and start off all waiting consumers - self.exit.set() - self.pause.set() - self.start.set() - - for proc in self.procs: - proc.join() - proc.terminate() - - super(MultiProcessConsumer, self).stop() - - def __iter__(self): - """ - Iterator to consume the messages available on this consumer - """ - # Trigger the consumer procs to start off. - # We will iterate till there are no more messages available - self.size.value = 0 - self.pause.set() - - while True: - self.start.set() - try: - # We will block for a small while so that the consumers get - # a chance to run and put some messages in the queue - # TODO: This is a hack and will make the consumer block for - # at least one second. Need to find a better way of doing this - partition, message = self.queue.get(block=True, timeout=1) - except Empty: - break - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + 1 - self.start.clear() - self.count_since_commit += 1 - self._auto_commit() - yield message - - self.start.clear() - - def get_messages(self, count=1, block=True, timeout=10): - """ - Fetch the specified number of messages - - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. - """ - messages = [] - - # Give a size hint to the consumers. Each consumer process will fetch - # a maximum of "count" messages. This will fetch more messages than - # necessary, but these will not be committed to kafka. Also, the extra - # messages can be provided in subsequent runs - self.size.value = count - self.pause.clear() - - if timeout is not None: - max_time = time.time() + timeout - - new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - # Trigger consumption only if the queue is empty - # By doing this, we will ensure that consumers do not - # go into overdrive and keep consuming thousands of - # messages when the user might need only a few - if self.queue.empty(): - self.start.set() - - try: - partition, message = self.queue.get(block, timeout) - except Empty: - break - - messages.append(message) - new_offsets[partition] = message.offset + 1 - count -= 1 - if timeout is not None: - timeout = max_time - time.time() - - self.size.value = 0 - self.start.clear() - self.pause.set() - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - - return messages diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py new file mode 100644 index 0000000..d2fa306 --- /dev/null +++ b/kafka/consumer/__init__.py @@ -0,0 +1,6 @@ +from .simple import SimpleConsumer +from .multiprocess import MultiProcessConsumer + +__all__ = [ + 'SimpleConsumer', 'MultiProcessConsumer' +] diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py new file mode 100644 index 0000000..506b405 --- /dev/null +++ b/kafka/consumer/base.py @@ -0,0 +1,169 @@ +from __future__ import absolute_import + +import logging +import numbers +from threading import Lock + +import kafka.common +from kafka.common import ( + OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + UnknownTopicOrPartitionError +) + +from kafka.util import ReentrantTimer + +log = logging.getLogger("kafka") + +AUTO_COMMIT_MSG_COUNT = 100 +AUTO_COMMIT_INTERVAL = 5000 + +FETCH_DEFAULT_BLOCK_TIMEOUT = 1 +FETCH_MAX_WAIT_TIME = 100 +FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 + +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 + + +class Consumer(object): + """ + Base class to be used by other consumers. Not to be used directly + + This base class provides logic for + * initialization and fetching metadata of partitions + * Auto-commit logic + * APIs for fetching pending message count + """ + def __init__(self, client, group, topic, partitions=None, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): + + self.client = client + self.topic = topic + self.group = group + self.client.load_metadata_for_topics(topic) + self.offsets = {} + + if not partitions: + partitions = self.client.get_partition_ids_for_topic(topic) + else: + assert all(isinstance(x, numbers.Integral) for x in partitions) + + # Variables for handling offset commits + self.commit_lock = Lock() + self.commit_timer = None + self.count_since_commit = 0 + self.auto_commit = auto_commit + self.auto_commit_every_n = auto_commit_every_n + self.auto_commit_every_t = auto_commit_every_t + + # Set up the auto-commit timer + if auto_commit is True and auto_commit_every_t is not None: + self.commit_timer = ReentrantTimer(auto_commit_every_t, + self.commit) + self.commit_timer.start() + + if auto_commit: + self.fetch_last_known_offsets(partitions) + else: + for partition in partitions: + self.offsets[partition] = 0 + + def fetch_last_known_offsets(self, partitions=None): + if not partitions: + partitions = self.client.get_partition_ids_for_topic(self.topic) + + def get_or_init_offset(resp): + try: + kafka.common.check_error(resp) + return resp.offset + except UnknownTopicOrPartitionError: + return 0 + + for partition in partitions: + req = OffsetFetchRequest(self.topic, partition) + (resp,) = self.client.send_offset_fetch_request(self.group, [req], + fail_on_error=False) + self.offsets[partition] = get_or_init_offset(resp) + self.fetch_offsets = self.offsets.copy() + + def commit(self, partitions=None): + """ + Commit offsets for this consumer + + partitions: list of partitions to commit, default is to commit + all of them + """ + + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state + if self.count_since_commit == 0: + return + + with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + + reqs = [] + if not partitions: # commit all partitions + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) + for resp in resps: + kafka.common.check_error(resp) + + self.count_since_commit = 0 + + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit >= self.auto_commit_every_n: + self.commit() + + def stop(self): + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + + def pending(self, partitions=None): + """ + Gets the pending message count + + partitions: list of partitions to check for, default is to check all + """ + if not partitions: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset - (1 if offset > 0 else 0) + + return total diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py new file mode 100644 index 0000000..912e64b --- /dev/null +++ b/kafka/consumer/multiprocess.py @@ -0,0 +1,248 @@ +from __future__ import absolute_import + +import logging +import time +from multiprocessing import Process, Queue as MPQueue, Event, Value + +try: + from Queue import Empty +except ImportError: # python 2 + from queue import Empty + +from .base import ( + AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, + NO_MESSAGES_WAIT_TIME_SECONDS +) +from .simple import Consumer, SimpleConsumer + +log = logging.getLogger("kafka") + + +def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): + """ + A child process worker which consumes messages based on the + notifications given by the controller process + + NOTE: Ideally, this should have been a method inside the Consumer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + + # Make the child processes open separate socket connections + client.reinit() + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + partitions=chunk, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + # Wait till the controller indicates us to start consumption + start.wait() + + # If we are asked to quit, do so + if exit.is_set(): + break + + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + message = consumer.get_message() + if message: + queue.put(message) + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + pause.wait() + + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + + consumer.stop() + + +class MultiProcessConsumer(Consumer): + """ + A consumer implementation that consumes partitions for a topic in + parallel using multiple processes + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + num_procs=1, partitions_per_proc=0): + + # Initiate the base consumer class + super(MultiProcessConsumer, self).__init__( + client, group, topic, + partitions=None, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + # Variables for managing and controlling the data flow from + # consumer child process to master + self.queue = MPQueue(1024) # Child consumers dump messages into this + self.start = Event() # Indicates the consumers to start fetch + self.exit = Event() # Requests the consumers to shutdown + self.pause = Event() # Requests the consumers to pause fetch + self.size = Value('i', 0) # Indicator of number of messages to fetch + + partitions = self.offsets.keys() + + # If unspecified, start one consumer per partition + # The logic below ensures that + # * we do not cross the num_procs limit + # * we have an even distribution of partitions among processes + if not partitions_per_proc: + partitions_per_proc = round(len(partitions) * 1.0 / num_procs) + if partitions_per_proc < num_procs * 0.5: + partitions_per_proc += 1 + + # The final set of chunks + chunker = lambda *x: [] + list(x) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + + self.procs = [] + for chunk in chunks: + chunk = filter(lambda x: x is not None, chunk) + args = (client.copy(), + group, topic, list(chunk), + self.queue, self.start, self.exit, + self.pause, self.size) + + proc = Process(target=_mp_consume, args=args) + proc.daemon = True + proc.start() + self.procs.append(proc) + + def __repr__(self): + return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \ + (self.group, self.topic, len(self.procs)) + + def stop(self): + # Set exit and start off all waiting consumers + self.exit.set() + self.pause.set() + self.start.set() + + for proc in self.procs: + proc.join() + proc.terminate() + + super(MultiProcessConsumer, self).stop() + + def __iter__(self): + """ + Iterator to consume the messages available on this consumer + """ + # Trigger the consumer procs to start off. + # We will iterate till there are no more messages available + self.size.value = 0 + self.pause.set() + + while True: + self.start.set() + try: + # We will block for a small while so that the consumers get + # a chance to run and put some messages in the queue + # TODO: This is a hack and will make the consumer block for + # at least one second. Need to find a better way of doing this + partition, message = self.queue.get(block=True, timeout=1) + except Empty: + break + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + 1 + self.start.clear() + self.count_since_commit += 1 + self._auto_commit() + yield message + + self.start.clear() + + def get_messages(self, count=1, block=True, timeout=10): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. + """ + messages = [] + + # Give a size hint to the consumers. Each consumer process will fetch + # a maximum of "count" messages. This will fetch more messages than + # necessary, but these will not be committed to kafka. Also, the extra + # messages can be provided in subsequent runs + self.size.value = count + self.pause.clear() + + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): + # Trigger consumption only if the queue is empty + # By doing this, we will ensure that consumers do not + # go into overdrive and keep consuming thousands of + # messages when the user might need only a few + if self.queue.empty(): + self.start.set() + + try: + partition, message = self.queue.get(block, timeout) + except Empty: + break + + messages.append(message) + new_offsets[partition] = message.offset + 1 + count -= 1 + if timeout is not None: + timeout = max_time - time.time() + + self.size.value = 0 + self.start.clear() + self.pause.set() + + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + + return messages diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py new file mode 100644 index 0000000..dcc71a9 --- /dev/null +++ b/kafka/consumer/simple.py @@ -0,0 +1,318 @@ +from __future__ import absolute_import + +try: + from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 +except ImportError: # python 2 + from itertools import izip_longest as izip_longest, repeat +import logging +import time + +import six + +try: + from Queue import Empty, Queue +except ImportError: # python 2 + from queue import Empty, Queue + +from kafka.common import ( + FetchRequest, OffsetRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData +) +from .base import ( + Consumer, + FETCH_DEFAULT_BLOCK_TIMEOUT, + AUTO_COMMIT_MSG_COUNT, + AUTO_COMMIT_INTERVAL, + FETCH_MIN_BYTES, + FETCH_BUFFER_SIZE_BYTES, + MAX_FETCH_BUFFER_SIZE_BYTES, + FETCH_MAX_WAIT_TIME, + ITER_TIMEOUT_SECONDS, + NO_MESSAGES_WAIT_TIME_SECONDS +) + +log = logging.getLogger("kafka") + +class FetchContext(object): + """ + Class for managing the state of a consumer during fetch + """ + def __init__(self, consumer, block, timeout): + self.consumer = consumer + self.block = block + + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 + + def __enter__(self): + """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes + if self.block: + self.consumer.fetch_max_wait_time = self.timeout + self.consumer.fetch_min_bytes = 1 + else: + self.consumer.fetch_min_bytes = 0 + + def __exit__(self, type, value, traceback): + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes + + +class SimpleConsumer(Consumer): + """ + A simple consumer implementation that consumes all/specified partitions + for a topic + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + """ + def __init__(self, client, group, topic, auto_commit=True, partitions=None, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) + + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) + self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size + self.partition_info = False # Do not return partition info in msgs + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = fetch_size_bytes + self.fetch_offsets = self.offsets.copy() + self.iter_timeout = iter_timeout + self.queue = Queue() + + def __repr__(self): + return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ + (self.group, self.topic, str(self.offsets.keys())) + + def provide_partition_info(self): + """ + Indicates that partition info must be returned by the consumer + """ + self.partition_info = True + + def seek(self, offset, whence): + """ + Alter the current offset in the consumer, similar to fseek + + offset: how much to modify the offset + whence: where to modify it from + 0 is relative to the earliest available offset (head) + 1 is relative to the current offset + 2 is relative to the latest known offset (tail) + """ + + if whence == 1: # relative to current position + for partition, _offset in self.offsets.items(): + self.offsets[partition] = _offset + offset + elif whence in (0, 2): # relative to beginning or end + # divide the request offset by number of partitions, + # distribute the remained evenly + (delta, rem) = divmod(offset, len(self.offsets)) + deltas = {} + for partition, r in izip_longest(self.offsets.keys(), + repeat(1, rem), fillvalue=0): + deltas[partition] = delta + r + + reqs = [] + for partition in self.offsets.keys(): + if whence == 0: + reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + elif whence == 2: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + else: + pass + + resps = self.client.send_offset_request(reqs) + for resp in resps: + self.offsets[resp.partition] = \ + resp.offsets[0] + deltas[resp.partition] + else: + raise ValueError("Unexpected value for `whence`, %d" % whence) + + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + if self.auto_commit: + self.count_since_commit += 1 + self.commit() + + self.queue = Queue() + + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. + """ + messages = [] + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 + count -= 1 + else: + # Ran out of messages for the last request. + if not block: + # If we're not blocking, break. + break + if timeout is not None: + # If we're blocking and have a timeout, reduce it to the + # appropriate value + timeout = max_time - time.time() + + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages + + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): + """ + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message + """ + if self.queue.empty(): + # We're out of messages, go grab some more. + with FetchContext(self, block, timeout): + self._fetch() + try: + partition, message = self.queue.get_nowait() + + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: + return partition, message + else: + return message + except Empty: + return None + + def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout + + while True: + message = self.get_message(True, timeout) + if message: + yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + else: + # Timed out waiting for a message + break + + def _fetch(self): + # Create fetch request payloads for all the partitions + partitions = dict((p, self.buffer_size) + for p in self.fetch_offsets.keys()) + while partitions: + requests = [] + for partition, buffer_size in six.iteritems(partitions): + requests.append(FetchRequest(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = {} + for resp in responses: + partition = resp.partition + buffer_size = partitions[partition] + try: + for message in resp.messages: + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 + except ConsumerFetchSizeTooSmall: + if (self.max_buffer_size is not None and + buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise + if self.max_buffer_size is None: + buffer_size *= 2 + else: + buffer_size = max(buffer_size * 2, + self.max_buffer_size) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", buffer_size) + retry_partitions[partition] = buffer_size + except ConsumerNoMoreData as e: + log.debug("Iteration was ended by %r", e) + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) + partitions = retry_partitions diff --git a/kafka/partitioner.py b/kafka/partitioner.py deleted file mode 100644 index 695dd6f..0000000 --- a/kafka/partitioner.py +++ /dev/null @@ -1,58 +0,0 @@ -from itertools import cycle - - -class Partitioner(object): - """ - Base class for a partitioner - """ - def __init__(self, partitions): - """ - Initialize the partitioner - - partitions - A list of available partitions (during startup) - """ - self.partitions = partitions - - def partition(self, key, partitions): - """ - Takes a string key and num_partitions as argument and returns - a partition to be used for the message - - partitions - The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing - """ - raise NotImplementedError('partition function has to be implemented') - - -class RoundRobinPartitioner(Partitioner): - """ - Implements a round robin partitioner which sends data to partitions - in a round robin fashion - """ - def __init__(self, partitions): - super(RoundRobinPartitioner, self).__init__(partitions) - self.iterpart = cycle(partitions) - - def _set_partitions(self, partitions): - self.partitions = partitions - self.iterpart = cycle(partitions) - - def partition(self, key, partitions): - # Refresh the partition list if necessary - if self.partitions != partitions: - self._set_partitions(partitions) - - return next(self.iterpart) - - -class HashedPartitioner(Partitioner): - """ - Implements a partitioner which selects the target partition based on - the hash of the key - """ - def partition(self, key, partitions): - size = len(partitions) - idx = hash(key) % size - - return partitions[idx] diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py new file mode 100644 index 0000000..fdb19bb --- /dev/null +++ b/kafka/partitioner/__init__.py @@ -0,0 +1,6 @@ +from .roundrobin import RoundRobinPartitioner +from .hashed import HashedPartitioner + +__all__ = [ + 'RoundRobinPartitioner', 'HashedPartitioner' +] diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py new file mode 100644 index 0000000..c62b7ed --- /dev/null +++ b/kafka/partitioner/base.py @@ -0,0 +1,23 @@ + +class Partitioner(object): + """ + Base class for a partitioner + """ + def __init__(self, partitions): + """ + Initialize the partitioner + + partitions - A list of available partitions (during startup) + """ + self.partitions = partitions + + def partition(self, key, partitions): + """ + Takes a string key and num_partitions as argument and returns + a partition to be used for the message + + partitions - The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing + """ + raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py new file mode 100644 index 0000000..587a3de --- /dev/null +++ b/kafka/partitioner/hashed.py @@ -0,0 +1,12 @@ +from .base import Partitioner + +class HashedPartitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def partition(self, key, partitions): + size = len(partitions) + idx = hash(key) % size + + return partitions[idx] diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py new file mode 100644 index 0000000..54d00da --- /dev/null +++ b/kafka/partitioner/roundrobin.py @@ -0,0 +1,23 @@ +from itertools import cycle + +from .base import Partitioner + +class RoundRobinPartitioner(Partitioner): + """ + Implements a round robin partitioner which sends data to partitions + in a round robin fashion + """ + def __init__(self, partitions): + super(RoundRobinPartitioner, self).__init__(partitions) + self.iterpart = cycle(partitions) + + def _set_partitions(self, partitions): + self.partitions = partitions + self.iterpart = cycle(partitions) + + def partition(self, key, partitions): + # Refresh the partition list if necessary + if self.partitions != partitions: + self._set_partitions(partitions) + + return next(self.iterpart) diff --git a/kafka/producer/__init__.py b/kafka/producer/__init__.py new file mode 100644 index 0000000..bc0e7c6 --- /dev/null +++ b/kafka/producer/__init__.py @@ -0,0 +1,6 @@ +from .simple import SimpleProducer +from .keyed import KeyedProducer + +__all__ = [ + 'SimpleProducer', 'KeyedProducer' +] diff --git a/kafka/producer.py b/kafka/producer/base.py index f186649..6c91364 100644 --- a/kafka/producer.py +++ b/kafka/producer/base.py @@ -2,23 +2,19 @@ from __future__ import absolute_import import logging import time -import random try: from queue import Empty except ImportError: from Queue import Empty from collections import defaultdict -from itertools import cycle from multiprocessing import Queue, Process import six -from six.moves import xrange from kafka.common import ( - ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError + ProduceRequest, TopicAndPartition, UnsupportedCodecError ) -from kafka.partitioner import HashedPartitioner from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set log = logging.getLogger("kafka") @@ -208,112 +204,3 @@ class Producer(object): if self.proc.is_alive(): self.proc.terminate() - - -class SimpleProducer(Producer): - """ - A simple, round-robin producer. Each message goes to exactly one partition - - Params: - client - The Kafka client instance to use - async - If True, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout - random_start - If true, randomize the initial partition which the - the first message block will be published to, otherwise - if false, the first message block will always publish - to partition 0 before cycling through each partition - """ - def __init__(self, client, async=False, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, - codec=None, - batch_send=False, - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=False): - self.partition_cycles = {} - self.random_start = random_start - super(SimpleProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, - batch_send_every_n, - batch_send_every_t) - - def _next_partition(self, topic): - if topic not in self.partition_cycles: - if not self.client.has_metadata_for_topic(topic): - self.client.load_metadata_for_topics(topic) - - self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) - - # Randomize the initial partition that is returned - if self.random_start: - num_partitions = len(self.client.get_partition_ids_for_topic(topic)) - for _ in xrange(random.randint(0, num_partitions-1)): - next(self.partition_cycles[topic]) - - return next(self.partition_cycles[topic]) - - def send_messages(self, topic, *msg): - partition = self._next_partition(topic) - return super(SimpleProducer, self).send_messages(topic, partition, *msg) - - def __repr__(self): - return '<SimpleProducer batch=%s>' % self.async - - -class KeyedProducer(Producer): - """ - A producer which distributes messages to partitions based on the key - - Args: - client - The kafka client instance - partitioner - A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async - If True, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout - """ - def __init__(self, client, partitioner=None, async=False, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, - codec=None, - batch_send=False, - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - if not partitioner: - partitioner = HashedPartitioner - self.partitioner_class = partitioner - self.partitioners = {} - - super(KeyedProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, - batch_send_every_n, - batch_send_every_t) - - def _next_partition(self, topic, key): - if topic not in self.partitioners: - if not self.client.has_metadata_for_topic(topic): - self.client.load_metadata_for_topics(topic) - - self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) - - partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) - - def send(self, topic, key, msg): - partition = self._next_partition(topic, key) - return self.send_messages(topic, partition, msg) - - def __repr__(self): - return '<KeyedProducer batch=%s>' % self.async diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py new file mode 100644 index 0000000..d2311c6 --- /dev/null +++ b/kafka/producer/keyed.py @@ -0,0 +1,62 @@ +from __future__ import absolute_import + +import logging + +from kafka.partitioner import HashedPartitioner +from .base import ( + Producer, BATCH_SEND_DEFAULT_INTERVAL, + BATCH_SEND_MSG_COUNT +) + +log = logging.getLogger("kafka") + + +class KeyedProducer(Producer): + """ + A producer which distributes messages to partitions based on the key + + Args: + client - The kafka client instance + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout + """ + def __init__(self, client, partitioner=None, async=False, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + codec=None, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + if not partitioner: + partitioner = HashedPartitioner + self.partitioner_class = partitioner + self.partitioners = {} + + super(KeyedProducer, self).__init__(client, async, req_acks, + ack_timeout, codec, batch_send, + batch_send_every_n, + batch_send_every_t) + + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if not self.client.has_metadata_for_topic(topic): + self.client.load_metadata_for_topics(topic) + + self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) + + partitioner = self.partitioners[topic] + return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) + + def send(self, topic, key, msg): + partition = self._next_partition(topic, key) + return self.send_messages(topic, partition, msg) + + def __repr__(self): + return '<KeyedProducer batch=%s>' % self.async diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py new file mode 100644 index 0000000..a10fa8c --- /dev/null +++ b/kafka/producer/simple.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import + +import logging +import random + +from itertools import cycle + +from six.moves import xrange + +from .base import ( + Producer, BATCH_SEND_DEFAULT_INTERVAL, + BATCH_SEND_MSG_COUNT +) + +log = logging.getLogger("kafka") + + +class SimpleProducer(Producer): + """ + A simple, round-robin producer. Each message goes to exactly one partition + + Params: + client - The Kafka client instance to use + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send - If True, messages are send in batches + batch_send_every_n - If set, messages are send in batches of this size + batch_send_every_t - If set, messages are send after this timeout + random_start - If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition + """ + def __init__(self, client, async=False, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, + codec=None, + batch_send=False, + batch_send_every_n=BATCH_SEND_MSG_COUNT, + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + random_start=False): + self.partition_cycles = {} + self.random_start = random_start + super(SimpleProducer, self).__init__(client, async, req_acks, + ack_timeout, codec, batch_send, + batch_send_every_n, + batch_send_every_t) + + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if not self.client.has_metadata_for_topic(topic): + self.client.load_metadata_for_topics(topic) + + self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) + + # Randomize the initial partition that is returned + if self.random_start: + num_partitions = len(self.client.get_partition_ids_for_topic(topic)) + for _ in xrange(random.randint(0, num_partitions-1)): + next(self.partition_cycles[topic]) + + return next(self.partition_cycles[topic]) + + def send_messages(self, topic, *msg): + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages(topic, partition, *msg) + + def __repr__(self): + return '<SimpleProducer batch=%s>' % self.async diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 6895c0e..717eac5 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -4,7 +4,7 @@ from six.moves import xrange from kafka import SimpleConsumer, MultiProcessConsumer, create_message from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall -from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index b572b4e..d307d41 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -6,7 +6,7 @@ from . import unittest from kafka import KafkaClient, SimpleConsumer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError -from kafka.producer import Producer +from kafka.producer.base import Producer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( diff --git a/test/test_producer.py b/test/test_producer.py index 1375525..caf8fe3 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -5,7 +5,7 @@ import logging from mock import MagicMock from . import unittest -from kafka.producer import Producer +from kafka.producer.base import Producer class TestKafkaProducer(unittest.TestCase): def test_producer_message_types(self): |