summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-09 16:42:23 -0700
committerDana Powers <dana.powers@rd.io>2014-09-10 22:14:29 -0700
commit27e812e66bd04d3acf59ad6792b07d1c7056c036 (patch)
treea9960a0ec8d7fd4e1452a09f0ab7fcd69f6d0494 /kafka/consumer/simple.py
parentf806c24d849928c95342d2b82a9d2d9507ba175a (diff)
downloadkafka-python-27e812e66bd04d3acf59ad6792b07d1c7056c036.tar.gz
Separate consumers/producers/partitioners
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py318
1 files changed, 318 insertions, 0 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
new file mode 100644
index 0000000..dcc71a9
--- /dev/null
+++ b/kafka/consumer/simple.py
@@ -0,0 +1,318 @@
+from __future__ import absolute_import
+
+try:
+ from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
+except ImportError: # python 2
+ from itertools import izip_longest as izip_longest, repeat
+import logging
+import time
+
+import six
+
+try:
+ from Queue import Empty, Queue
+except ImportError: # python 2
+ from queue import Empty, Queue
+
+from kafka.common import (
+ FetchRequest, OffsetRequest,
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+)
+from .base import (
+ Consumer,
+ FETCH_DEFAULT_BLOCK_TIMEOUT,
+ AUTO_COMMIT_MSG_COUNT,
+ AUTO_COMMIT_INTERVAL,
+ FETCH_MIN_BYTES,
+ FETCH_BUFFER_SIZE_BYTES,
+ MAX_FETCH_BUFFER_SIZE_BYTES,
+ FETCH_MAX_WAIT_TIME,
+ ITER_TIMEOUT_SECONDS,
+ NO_MESSAGES_WAIT_TIME_SECONDS
+)
+
+log = logging.getLogger("kafka")
+
+class FetchContext(object):
+ """
+ Class for managing the state of a consumer during fetch
+ """
+ def __init__(self, consumer, block, timeout):
+ self.consumer = consumer
+ self.block = block
+
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
+
+ def __enter__(self):
+ """Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
+ if self.block:
+ self.consumer.fetch_max_wait_time = self.timeout
+ self.consumer.fetch_min_bytes = 1
+ else:
+ self.consumer.fetch_min_bytes = 0
+
+ def __exit__(self, type, value, traceback):
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
+
+
+class SimpleConsumer(Consumer):
+ """
+ A simple consumer implementation that consumes all/specified partitions
+ for a topic
+
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+ partitions: An optional list of partitions to consume the data from
+
+ auto_commit: default True. Whether or not to auto commit the offsets
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
+
+ Auto commit details:
+ If both auto_commit_every_n and auto_commit_every_t are set, they will
+ reset one another when one is triggered. These triggers simply call the
+ commit method on this class. A manual call to commit will also reset
+ these triggers
+ """
+ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
+ super(SimpleConsumer, self).__init__(
+ client, group, topic,
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
+
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = fetch_size_bytes
+ self.fetch_offsets = self.offsets.copy()
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
+
+ def __repr__(self):
+ return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
+ (self.group, self.topic, str(self.offsets.keys()))
+
+ def provide_partition_info(self):
+ """
+ Indicates that partition info must be returned by the consumer
+ """
+ self.partition_info = True
+
+ def seek(self, offset, whence):
+ """
+ Alter the current offset in the consumer, similar to fseek
+
+ offset: how much to modify the offset
+ whence: where to modify it from
+ 0 is relative to the earliest available offset (head)
+ 1 is relative to the current offset
+ 2 is relative to the latest known offset (tail)
+ """
+
+ if whence == 1: # relative to current position
+ for partition, _offset in self.offsets.items():
+ self.offsets[partition] = _offset + offset
+ elif whence in (0, 2): # relative to beginning or end
+ # divide the request offset by number of partitions,
+ # distribute the remained evenly
+ (delta, rem) = divmod(offset, len(self.offsets))
+ deltas = {}
+ for partition, r in izip_longest(self.offsets.keys(),
+ repeat(1, rem), fillvalue=0):
+ deltas[partition] = delta + r
+
+ reqs = []
+ for partition in self.offsets.keys():
+ if whence == 0:
+ reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+ elif whence == 2:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ else:
+ pass
+
+ resps = self.client.send_offset_request(reqs)
+ for resp in resps:
+ self.offsets[resp.partition] = \
+ resp.offsets[0] + deltas[resp.partition]
+ else:
+ raise ValueError("Unexpected value for `whence`, %d" % whence)
+
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ if self.auto_commit:
+ self.count_since_commit += 1
+ self.commit()
+
+ self.queue = Queue()
+
+ def get_messages(self, count=1, block=True, timeout=0.1):
+ """
+ Fetch the specified number of messages
+
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
+ """
+ messages = []
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
+ count -= 1
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout is not None:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
+
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+ return messages
+
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
+ """
+ If no messages can be fetched, returns None.
+ If get_partition_info is None, it defaults to self.partition_info
+ If get_partition_info is True, returns (partition, message)
+ If get_partition_info is False, returns message
+ """
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ partition, message = self.queue.get_nowait()
+
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
+ return partition, message
+ else:
+ return message
+ except Empty:
+ return None
+
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
+
+ while True:
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+ else:
+ # Timed out waiting for a message
+ break
+
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ partitions = dict((p, self.buffer_size)
+ for p in self.fetch_offsets.keys())
+ while partitions:
+ requests = []
+ for partition, buffer_size in six.iteritems(partitions):
+ requests.append(FetchRequest(self.topic, partition,
+ self.fetch_offsets[partition],
+ buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = {}
+ for resp in responses:
+ partition = resp.partition
+ buffer_size = partitions[partition]
+ try:
+ for message in resp.messages:
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
+ except ConsumerFetchSizeTooSmall:
+ if (self.max_buffer_size is not None and
+ buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise
+ if self.max_buffer_size is None:
+ buffer_size *= 2
+ else:
+ buffer_size = max(buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", buffer_size)
+ retry_partitions[partition] = buffer_size
+ except ConsumerNoMoreData as e:
+ log.debug("Iteration was ended by %r", e)
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
+ partitions = retry_partitions