summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-03-29 18:09:03 -0700
committerDana Powers <dana.powers@gmail.com>2015-03-29 18:09:03 -0700
commitbb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc (patch)
treeccb16a62c4da6d1b6e33801289fdbcfdf3c0713f
parentfd204dca174033e36899a0e20d2ce7ebccf11ddb (diff)
parent35b8f5b5d8b0888806d5d6c9ec02910327c3a671 (diff)
downloadkafka-python-bb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc.tar.gz
Merge pull request #341 from dpkp/kafka_consumer_docs
KafkaConsumer documentation
-rw-r--r--docs/requirements.txt1
-rw-r--r--docs/usage.rst103
-rw-r--r--kafka/consumer/kafka.py228
-rw-r--r--test/test_consumer_integration.py2
4 files changed, 185 insertions, 149 deletions
diff --git a/docs/requirements.txt b/docs/requirements.txt
index 86b4f05..d32365f 100644
--- a/docs/requirements.txt
+++ b/docs/requirements.txt
@@ -1,5 +1,6 @@
sphinx
sphinxcontrib-napoleon
+sphinx_rtd_theme
# Install kafka-python in editable mode
# This allows the sphinx autodoc module
diff --git a/docs/usage.rst b/docs/usage.rst
index 141cf93..150d121 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -1,12 +1,12 @@
Usage
=====
-High level
-----------
+SimpleProducer
+--------------
.. code:: python
- from kafka import SimpleProducer, KafkaClient, KafkaConsumer
+ from kafka import SimpleProducer, KafkaClient
# To send messages synchronously
kafka = KafkaClient("localhost:9092")
@@ -51,17 +51,6 @@ High level
batch_send_every_n=20,
batch_send_every_t=60)
- # To consume messages
- consumer = KafkaConsumer("my-topic", group_id="my_group",
- metadata_broker_list=["localhost:9092"])
- for message in consumer:
- # message is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.decode('utf-8')`
- print(message)
-
- kafka.close()
-
-
Keyed messages
--------------
@@ -80,6 +69,92 @@ Keyed messages
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+
+KafkaConsumer
+-------------
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # To consume messages
+ consumer = KafkaConsumer("my-topic",
+ group_id="my_group",
+ bootstrap_servers=["localhost:9092"])
+ for message in consumer:
+ # message value is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.value.decode('utf-8')`
+ print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
+ message.offset, message.key,
+ message.value))
+
+ kafka.close()
+
+
+messages (m) are namedtuples with attributes:
+
+ * `m.topic`: topic name (str)
+ * `m.partition`: partition number (int)
+ * `m.offset`: message offset on topic-partition log (int)
+ * `m.key`: key (bytes - can be None)
+ * `m.value`: message (output of deserializer_class - default is raw bytes)
+
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # more advanced consumer -- multiple topics w/ auto commit offset
+ # management
+ consumer = KafkaConsumer('topic1', 'topic2',
+ bootstrap_servers=['localhost:9092'],
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in consumer:
+ do_some_work(m)
+
+ # Mark this message as fully consumed
+ # so it can be included in the next commit
+ #
+ # **messages that are not marked w/ task_done currently do not commit!
+ kafka.task_done(m)
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
+
+ # Batch process interface
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+
+
+ Configuration settings can be passed to constructor,
+ otherwise defaults will be used:
+
+.. code:: python
+
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ bootstrap_servers=[],
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
+
+ Configuration parameters are described in more detail at
+ http://kafka.apache.org/documentation.html#highlevelconsumerapi
+
Multiprocess consumer
---------------------
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index f03d15e..6f5bcdd 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -52,114 +52,59 @@ DEPRECATED_CONFIG_KEYS = {
}
class KafkaConsumer(object):
- """
- A simpler kafka consumer
-
- .. code:: python
-
- # A very basic 'tail' consumer, with no stored offset management
- kafka = KafkaConsumer('topic1',
- bootstrap_servers=['localhost:9092'])
- for m in kafka:
- print m
-
- # Alternate interface: next()
- print kafka.next()
-
- # Alternate interface: batch iteration
- while True:
- for m in kafka.fetch_messages():
- print m
- print "Done with batch - let's do another!"
-
-
- .. code:: python
-
- # more advanced consumer -- multiple topics w/ auto commit offset
- # management
- kafka = KafkaConsumer('topic1', 'topic2',
- bootstrap_servers=['localhost:9092'],
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
-
- # Infinite iteration
- for m in kafka:
- process_message(m)
- kafka.task_done(m)
-
- # Alternate interface: next()
- m = kafka.next()
- process_message(m)
- kafka.task_done(m)
-
- # If auto_commit_enable is False, remember to commit() periodically
- kafka.commit()
-
- # Batch process interface
- while True:
- for m in kafka.fetch_messages():
- process_message(m)
- kafka.task_done(m)
-
-
- messages (m) are namedtuples with attributes:
-
- * `m.topic`: topic name (str)
- * `m.partition`: partition number (int)
- * `m.offset`: message offset on topic-partition log (int)
- * `m.key`: key (bytes - can be None)
- * `m.value`: message (output of deserializer_class - default is raw bytes)
-
- Configuration settings can be passed to constructor,
- otherwise defaults will be used:
-
- .. code:: python
-
- client_id='kafka.consumer.kafka',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- bootstrap_servers=[],
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=lambda msg: msg,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1
-
- Configuration parameters are described in more detail at
- http://kafka.apache.org/documentation.html#highlevelconsumerapi
- """
+ """A simpler kafka consumer"""
def __init__(self, *topics, **configs):
self.configure(**configs)
self.set_topic_partitions(*topics)
def configure(self, **configs):
- """
+ """Configure the consumer instance
+
Configuration settings can be passed to constructor,
otherwise defaults will be used:
- .. code:: python
-
- client_id='kafka.consumer.kafka',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- bootstrap_servers=[],
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=lambda msg: msg,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- auto_commit_interval_messages=None,
- consumer_timeout_ms=-1
+ Keyword Arguments:
+ bootstrap_servers (list): List of initial broker nodes the consumer
+ should contact to bootstrap initial cluster metadata. This does
+ not have to be the full node list. It just needs to have at
+ least one broker that will respond to a Metadata API Request.
+ client_id (str): a unique name for this client. Defaults to
+ 'kafka.consumer.kafka'.
+ group_id (str): the name of the consumer group to join,
+ Offsets are fetched / committed to this group name.
+ fetch_message_max_bytes (int, optional): Maximum bytes for each
+ topic/partition fetch request. Defaults to 1024*1024.
+ fetch_min_bytes (int, optional): Minimum amount of data the server
+ should return for a fetch request, otherwise wait up to
+ fetch_wait_max_ms for more data to accumulate. Defaults to 1.
+ fetch_wait_max_ms (int, optional): Maximum time for the server to
+ block waiting for fetch_min_bytes messages to accumulate.
+ Defaults to 100.
+ refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
+ when refreshing metadata on errors (subject to random jitter).
+ Defaults to 200.
+ socket_timeout_ms (int, optional): TCP socket timeout in
+ milliseconds. Defaults to 30*1000.
+ auto_offset_reset (str, optional): A policy for resetting offsets on
+ OffsetOutOfRange errors. 'smallest' will move to the oldest
+ available message, 'largest' will move to the most recent. Any
+ ofther value will raise the exception. Defaults to 'largest'.
+ deserializer_class (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value. Defaults to
+ lambda msg: msg.
+ auto_commit_enable (bool, optional): Enabling auto-commit will cause
+ the KafkaConsumer to periodically commit offsets without an
+ explicit call to commit(). Defaults to False.
+ auto_commit_interval_ms (int, optional): If auto_commit_enabled,
+ the milliseconds between automatic offset commits. Defaults to
+ 60 * 1000.
+ auto_commit_interval_messages (int, optional): If
+ auto_commit_enabled, a number of messages consumed between
+ automatic offset commits. Defaults to None (disabled).
+ consumer_timeout_ms (int, optional): number of millisecond to throw
+ a timeout exception to the consumer if no message is available
+ for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
@@ -316,18 +261,18 @@ class KafkaConsumer(object):
self._reset_message_iterator()
def next(self):
- """
- Return a single message from the message iterator
- If consumer_timeout_ms is set, will raise ConsumerTimeout
- if no message is available
- Otherwise blocks indefinitely
+ """Return the next available message
- Note that this is also the method called internally during iteration:
+ Blocks indefinitely unless consumer_timeout_ms > 0
- .. code:: python
+ Returns:
+ a single KafkaMessage from the message iterator
- for m in consumer:
- pass
+ Raises:
+ ConsumerTimeout after consumer_timeout_ms and no message
+
+ Note:
+ This is also the method called internally during iteration
"""
self._set_consumer_timeout_start()
@@ -343,21 +288,24 @@ class KafkaConsumer(object):
self._check_consumer_timeout()
def fetch_messages(self):
- """
- Sends FetchRequests for all topic/partitions set for consumption
- Returns a generator that yields KafkaMessage structs
- after deserializing with the configured `deserializer_class`
+ """Sends FetchRequests for all topic/partitions set for consumption
+
+ Returns:
+ Generator that yields KafkaMessage structs
+ after deserializing with the configured `deserializer_class`
- Refreshes metadata on errors, and resets fetch offset on
- OffsetOutOfRange, per the configured `auto_offset_reset` policy
+ Note:
+ Refreshes metadata on errors, and resets fetch offset on
+ OffsetOutOfRange, per the configured `auto_offset_reset` policy
- Key configuration parameters:
+ See Also:
+ Key KafkaConsumer configuration parameters:
+ * `fetch_message_max_bytes`
+ * `fetch_max_wait_ms`
+ * `fetch_min_bytes`
+ * `deserializer_class`
+ * `auto_offset_reset`
- * `fetch_message_max_bytes`
- * `fetch_max_wait_ms`
- * `fetch_min_bytes`
- * `deserializer_class`
- * `auto_offset_reset`
"""
max_bytes = self._config['fetch_message_max_bytes']
@@ -436,21 +384,22 @@ class KafkaConsumer(object):
yield msg
def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
- """
- Request available fetch offsets for a single topic/partition
+ """Request available fetch offsets for a single topic/partition
- Arguments:
- topic (str)
- partition (int)
+ Keyword Arguments:
+ topic (str): topic for offset request
+ partition (int): partition for offset request
request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values. Specify -1 to receive the latest
offset (i.e. the offset of the next coming message) and -2 to receive the earliest
available offset. Note that because offsets are pulled in descending order, asking for
the earliest offset will always return you a single element.
- max_num_offsets (int)
+ max_num_offsets (int): Maximum offsets to include in the OffsetResponse
Returns:
- offsets (list)
+ a list of offsets in the OffsetResponse submitted for the provided
+ topic / partition. See:
+ https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
"""
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
@@ -466,7 +415,8 @@ class KafkaConsumer(object):
return resp.offsets
def offsets(self, group=None):
- """
+ """Get internal consumer offset values
+
Keyword Arguments:
group: Either "fetch", "commit", "task_done", or "highwater".
If no group specified, returns all groups.
@@ -485,10 +435,17 @@ class KafkaConsumer(object):
return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message):
- """
- Mark a fetched message as consumed.
+ """Mark a fetched message as consumed.
+
Offsets for messages marked as "task_done" will be stored back
to the kafka cluster for this consumer group on commit()
+
+ Arguments:
+ message (KafkaMessage): the message to mark as complete
+
+ Returns:
+ Nothing
+
"""
topic_partition = (message.topic, message.partition)
offset = message.offset
@@ -516,12 +473,15 @@ class KafkaConsumer(object):
self.commit()
def commit(self):
- """
- Store consumed message offsets (marked via task_done())
+ """Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group.
- **Note**: this functionality requires server version >=0.8.1.1
- See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_.
+ Returns:
+ True on success, or False if no offsets were found for commit
+
+ Note:
+ this functionality requires server version >=0.8.1.1
+ https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index d3df56a..17a8ac9 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -69,7 +69,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def kafka_consumer(self, **configs):
brokers = '%s:%d' % (self.server.host, self.server.port)
consumer = KafkaConsumer(self.topic,
- metadata_broker_list=brokers,
+ bootstrap_servers=brokers,
**configs)
return consumer