diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2019-10-11 12:03:22 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-11 12:03:22 -0700 |
commit | 3631bfa009a28767a2057c9beee470acaa6597d5 (patch) | |
tree | e10b73861a33d83a95b6496ef3074ee3caeaae41 /kafka/consumer/simple.py | |
parent | 6d3800ca9f45fd953689a1787fc90a5e566e34ea (diff) | |
download | kafka-python-3631bfa009a28767a2057c9beee470acaa6597d5.tar.gz |
Remove SimpleClient, Producer, Consumer, Unittest (#1196)
In the 2.0 release, we're removing:
* `SimpleClient`
* `SimpleConsumer`
* `SimpleProducer`
* Old partitioners used by `SimpleProducer`; these are superceded by
the `DefaultPartitioner`
These have been deprecated for several years in favor of `KafkaClient`
/ `KafkaConsumer` / `KafkaProducer`.
Since 2.0 allows breaking changes, we are removing the deprecated
classes.
Additionally, since the only usage of `unittest` was in tests for these
old Simple* clients, this also drops `unittest` from the library. All
tests now run under `pytest`.
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 444 |
1 files changed, 0 insertions, 444 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py deleted file mode 100644 index a6a64a5..0000000 --- a/kafka/consumer/simple.py +++ /dev/null @@ -1,444 +0,0 @@ -from __future__ import absolute_import - -try: - from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611 -except ImportError: - from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611 -import logging -import sys -import time -import warnings - -from kafka.vendor import six -from kafka.vendor.six.moves import queue # pylint: disable=import-error - -from kafka.consumer.base import ( - Consumer, - FETCH_DEFAULT_BLOCK_TIMEOUT, - AUTO_COMMIT_MSG_COUNT, - AUTO_COMMIT_INTERVAL, - FETCH_MIN_BYTES, - FETCH_BUFFER_SIZE_BYTES, - MAX_FETCH_BUFFER_SIZE_BYTES, - FETCH_MAX_WAIT_TIME, - ITER_TIMEOUT_SECONDS, - NO_MESSAGES_WAIT_TIME_SECONDS -) -from kafka.errors import ( - KafkaError, ConsumerFetchSizeTooSmall, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, - OffsetOutOfRangeError, FailedPayloadsError, check_error -) -from kafka.protocol.message import PartialMessage -from kafka.structs import FetchRequestPayload, OffsetRequestPayload - - -log = logging.getLogger(__name__) - - -class FetchContext(object): - """ - Class for managing the state of a consumer during fetch - """ - def __init__(self, consumer, block, timeout): - warnings.warn('deprecated - this class will be removed in a future' - ' release', DeprecationWarning) - self.consumer = consumer - self.block = block - - if block: - if not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - self.timeout = timeout * 1000 - - def __enter__(self): - """Set fetch values based on blocking status""" - self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time - self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes - if self.block: - self.consumer.fetch_max_wait_time = self.timeout - self.consumer.fetch_min_bytes = 1 - else: - self.consumer.fetch_min_bytes = 0 - - def __exit__(self, type, value, traceback): - """Reset values""" - self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time - self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes - - -class SimpleConsumer(Consumer): - """ - A simple consumer implementation that consumes all/specified partitions - for a topic - - Arguments: - client: a connected SimpleClient - group: a name for this consumer, used for offset storage and must be unique - If you are connecting to a server that does not support offset - commit/fetch (any prior to 0.8.1.1), then you *must* set this to None - topic: the topic to consume - - Keyword Arguments: - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - - auto_commit_every_n: default 100. How many messages to consume - before a commit - - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. - - auto_offset_reset: default largest. Reset partition offsets upon - OffsetOutOfRangeError. Valid values are largest and smallest. - Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES, - max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None, - auto_offset_reset='largest'): - warnings.warn('deprecated - this class will be removed in a future' - ' release. Use KafkaConsumer instead.', - DeprecationWarning) - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - if max_buffer_size is not None and buffer_size > max_buffer_size: - raise ValueError('buffer_size (%d) is greater than ' - 'max_buffer_size (%d)' % - (buffer_size, max_buffer_size)) - self.buffer_size = buffer_size - self.max_buffer_size = max_buffer_size - self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = fetch_size_bytes - self.fetch_offsets = self.offsets.copy() - self.iter_timeout = iter_timeout - self.auto_offset_reset = auto_offset_reset - self.queue = queue.Queue() - - def __repr__(self): - return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ - (self.group, self.topic, str(self.offsets.keys())) - - def reset_partition_offset(self, partition): - """Update offsets using auto_offset_reset policy (smallest|largest) - - Arguments: - partition (int): the partition for which offsets should be updated - - Returns: Updated offset on success, None on failure - """ - LATEST = -1 - EARLIEST = -2 - if self.auto_offset_reset == 'largest': - reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] - elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] - else: - # Let's raise an reasonable exception type if user calls - # outside of an exception context - if sys.exc_info() == (None, None, None): - raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' - 'valid auto_offset_reset setting ' - '(largest|smallest)') - # Otherwise we should re-raise the upstream exception - # b/c it typically includes additional data about - # the request that triggered it, and we do not want to drop that - raise # pylint: disable=E0704 - - # send_offset_request - log.info('Resetting topic-partition offset to %s for %s:%d', - self.auto_offset_reset, self.topic, partition) - try: - (resp, ) = self.client.send_offset_request(reqs) - except KafkaError as e: - log.error('%s sending offset request for %s:%d', - e.__class__.__name__, self.topic, partition) - else: - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] - return resp.offsets[0] - - def seek(self, offset, whence=None, partition=None): - """ - Alter the current offset in the consumer, similar to fseek - - Arguments: - offset: how much to modify the offset - whence: where to modify it from, default is None - - * None is an absolute offset - * 0 is relative to the earliest available offset (head) - * 1 is relative to the current offset - * 2 is relative to the latest known offset (tail) - - partition: modify which partition, default is None. - If partition is None, would modify all partitions. - """ - - if whence is None: # set an absolute offset - if partition is None: - for tmp_partition in self.offsets: - self.offsets[tmp_partition] = offset - else: - self.offsets[partition] = offset - elif whence == 1: # relative to current position - if partition is None: - for tmp_partition, _offset in self.offsets.items(): - self.offsets[tmp_partition] = _offset + offset - else: - self.offsets[partition] += offset - elif whence in (0, 2): # relative to beginning or end - reqs = [] - deltas = {} - if partition is None: - # divide the request offset by number of partitions, - # distribute the remained evenly - (delta, rem) = divmod(offset, len(self.offsets)) - for tmp_partition, r in izip_longest(self.offsets.keys(), - repeat(1, rem), - fillvalue=0): - deltas[tmp_partition] = delta + r - - for tmp_partition in self.offsets.keys(): - if whence == 0: - reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) - elif whence == 2: - reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) - else: - pass - else: - deltas[partition] = offset - if whence == 0: - reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) - elif whence == 2: - reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) - else: - pass - - resps = self.client.send_offset_request(reqs) - for resp in resps: - self.offsets[resp.partition] = \ - resp.offsets[0] + deltas[resp.partition] - else: - raise ValueError('Unexpected value for `whence`, %d' % (whence,)) - - # Reset queue and fetch offsets since they are invalid - self.fetch_offsets = self.offsets.copy() - self.count_since_commit += 1 - if self.auto_commit: - self.commit() - - self.queue = queue.Queue() - - def get_messages(self, count=1, block=True, timeout=0.1): - """ - Fetch the specified number of messages - - Keyword Arguments: - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till all messages are fetched. - If block is a positive integer the API will block until that - many messages are fetched. - timeout: When blocking is requested the function will block for - the specified time (in seconds) until count messages is - fetched. If None, it will block forever. - """ - messages = [] - if timeout is not None: - timeout += time.time() - - new_offsets = {} - log.debug('getting %d messages', count) - while len(messages) < count: - block_time = timeout - time.time() - log.debug('calling _get_message block=%s timeout=%s', block, block_time) - block_next_call = block is True or block > len(messages) - result = self._get_message(block_next_call, block_time, - get_partition_info=True, - update_offset=False) - log.debug('got %s from _get_messages', result) - if not result: - if block_next_call and (timeout is None or time.time() <= timeout): - continue - break - - partition, message = result - _msg = (partition, message) if self.partition_info else message - messages.append(_msg) - new_offsets[partition] = message.offset + 1 - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - log.debug('got %d messages: %s', len(messages), messages) - return messages - - def get_message(self, block=True, timeout=0.1, get_partition_info=None): - return self._get_message(block, timeout, get_partition_info) - - def _get_message(self, block=True, timeout=0.1, get_partition_info=None, - update_offset=True): - """ - If no messages can be fetched, returns None. - If get_partition_info is None, it defaults to self.partition_info - If get_partition_info is True, returns (partition, message) - If get_partition_info is False, returns message - """ - start_at = time.time() - while self.queue.empty(): - # We're out of messages, go grab some more. - log.debug('internal queue empty, fetching more messages') - with FetchContext(self, block, timeout): - self._fetch() - - if not block or time.time() > (start_at + timeout): - break - - try: - partition, message = self.queue.get_nowait() - - if update_offset: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - if get_partition_info is None: - get_partition_info = self.partition_info - if get_partition_info: - return partition, message - else: - return message - except queue.Empty: - log.debug('internal queue empty after fetch - returning None') - return None - - def __iter__(self): - if self.iter_timeout is None: - timeout = ITER_TIMEOUT_SECONDS - else: - timeout = self.iter_timeout - - while True: - message = self.get_message(True, timeout) - if message: - yield message - elif self.iter_timeout is None: - # We did not receive any message yet but we don't have a - # timeout, so give up the CPU for a while before trying again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - else: - # Timed out waiting for a message - break - - def _fetch(self): - # Create fetch request payloads for all the partitions - partitions = dict((p, self.buffer_size) - for p in self.fetch_offsets.keys()) - while partitions: - requests = [] - for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequestPayload(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes, - fail_on_error=False - ) - - retry_partitions = {} - for resp in responses: - - try: - check_error(resp) - except UnknownTopicOrPartitionError: - log.error('UnknownTopicOrPartitionError for %s:%d', - resp.topic, resp.partition) - self.client.reset_topic_metadata(resp.topic) - raise - except NotLeaderForPartitionError: - log.error('NotLeaderForPartitionError for %s:%d', - resp.topic, resp.partition) - self.client.reset_topic_metadata(resp.topic) - continue - except OffsetOutOfRangeError: - log.warning('OffsetOutOfRangeError for %s:%d. ' - 'Resetting partition offset...', - resp.topic, resp.partition) - self.reset_partition_offset(resp.partition) - # Retry this partition - retry_partitions[resp.partition] = partitions[resp.partition] - continue - except FailedPayloadsError as e: - log.warning('FailedPayloadsError for %s:%d', - e.payload.topic, e.payload.partition) - # Retry this partition - retry_partitions[e.payload.partition] = partitions[e.payload.partition] - continue - - partition = resp.partition - buffer_size = partitions[partition] - - # Check for partial message - if resp.messages and isinstance(resp.messages[-1].message, PartialMessage): - - # If buffer is at max and all we got was a partial message - # raise ConsumerFetchSizeTooSmall - if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size and - len(resp.messages) == 1): - - log.error('Max fetch size %d too small', self.max_buffer_size) - raise ConsumerFetchSizeTooSmall() - - if self.max_buffer_size is None: - buffer_size *= 2 - else: - buffer_size = min(buffer_size * 2, self.max_buffer_size) - log.warning('Fetch size too small, increase to %d (2x) ' - 'and retry', buffer_size) - retry_partitions[partition] = buffer_size - resp.messages.pop() - - for message in resp.messages: - if message.offset < self.fetch_offsets[partition]: - log.debug('Skipping message %s because its offset is less than the consumer offset', - message) - continue - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - partitions = retry_partitions |