summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst63
-rw-r--r--docs/index.rst102
-rw-r--r--docs/simple.rst144
-rw-r--r--docs/usage.rst96
4 files changed, 288 insertions, 117 deletions
diff --git a/README.rst b/README.rst
index 2bcc150..61b737f 100644
--- a/README.rst
+++ b/README.rst
@@ -12,39 +12,58 @@ Kafka Python client
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
->>> pip install kafka-python
+Python client for the Apache Kafka distributed stream processing system.
+kafka-python is designed to function much like the official java client, with a
+sprinkling of pythonic interfaces (e.g., consumer iterators).
+
+kafka-python is best used with 0.9 brokers, but is backwards-compatible with
+older versions (to 0.8.0). Some features will only be enabled on newer brokers,
+however; for example, fully coordinated consumer groups -- i.e., dynamic partition
+assignment to multiple consumers in the same group -- requires use of 0.9 kafka
+brokers. Supporting this feature for earlier broker releases would require
+writing and maintaining custom leadership election and membership / health
+check code (perhaps using zookeeper or consul). For older brokers, you can
+achieve something similar by manually assigning different partitions to each
+consumer instance with config management tools like chef, ansible, etc. This
+approach will work fine, though it does not support rebalancing on failures.
+See `Compatibility <http://kafka-python.readthedocs.org/en/master/compatibility.html>`_
+for more details.
-kafka-python is a client for the Apache Kafka distributed stream processing
-system. It is designed to function much like the official java client, with a
-sprinkling of pythonic interfaces (e.g., iterators).
+Please note that the master branch may contain unreleased features. For release
+documentation, please see readthedocs and/or python's inline help.
+>>> pip install kafka-python
KafkaConsumer
*************
+KafkaConsumer is a high-level message consumer, intended to operate as similarly
+as possible to the official 0.9 java client. Full support for coordinated
+consumer groups requires use of kafka brokers that support the 0.9 Group APIs.
+
+See `ReadTheDocs <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html>`_
+for API and configuration details.
+
+The consumer iterator returns ConsumerRecords, which are simple namedtuples
+that expose basic message attributes: topic, partition, offset, key, and value:
+
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
-KafkaConsumer is a full-featured,
-high-level message consumer class that is similar in design and function to the
-new 0.9 java consumer. Most configuration parameters defined by the official
-java client are supported as optional kwargs, with generally similar behavior.
-Gzip and Snappy compressed messages are supported transparently.
-
-In addition to the standard KafkaConsumer.poll() interface (which returns
-micro-batches of messages, grouped by topic-partition), kafka-python supports
-single-message iteration, yielding ConsumerRecord namedtuples, which include
-the topic, partition, offset, key, and value of each message.
-
-By default, KafkaConsumer will attempt to auto-commit
-message offsets every 5 seconds. When used with 0.9 kafka brokers,
-KafkaConsumer will dynamically assign partitions using
-the kafka GroupCoordinator APIs and a RoundRobinPartitionAssignor
-partitioning strategy, enabling relatively straightforward parallel consumption
-patterns. See `ReadTheDocs <http://kafka-python.readthedocs.org/master/>`_
-for examples.
+>>> # manually assign the partition list for the consumer
+>>> from kafka import TopicPartition
+>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
+>>> consumer.assign([TopicPartition('foobar', 2)])
+>>> msg = next(consumer)
+
+>>> # Deserialize msgpack-encoded values
+>>> consumer = KafkaConsumer(value_deserializer=msgpack.dumps)
+>>> consumer.subscribe(['msgpackfoo'])
+>>> for msg in consumer:
+... msg = next(consumer)
+... assert isinstance(msg.value, dict)
KafkaProducer
diff --git a/docs/index.rst b/docs/index.rst
index f65d4db..2f54b09 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -12,47 +12,98 @@ kafka-python
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
->>> pip install kafka-python
-
-kafka-python is a client for the Apache Kafka distributed stream processing
-system. It is designed to function much like the official java client, with a
-sprinkling of pythonic interfaces (e.g., iterators).
+Python client for the Apache Kafka distributed stream processing system.
+kafka-python is designed to function much like the official java client, with a
+sprinkling of pythonic interfaces (e.g., consumer iterators).
+
+kafka-python is best used with 0.9 brokers, but is backwards-compatible with
+older versions (to 0.8.0). Some features will only be enabled on newer brokers,
+however; for example, fully coordinated consumer groups -- i.e., dynamic
+partition assignment to multiple consumers in the same group -- requires use of
+0.9 kafka brokers. Supporting this feature for earlier broker releases would
+require writing and maintaining custom leadership election and membership /
+health check code (perhaps using zookeeper or consul). For older brokers, you
+can achieve something similar by manually assigning different partitions to
+each consumer instance with config management tools like chef, ansible, etc.
+This approach will work fine, though it does not support rebalancing on
+failures. See `Compatibility <compatibility.html>`_ for more details.
+
+Please note that the master branch may contain unreleased features. For release
+documentation, please see readthedocs and/or python's inline help.
+>>> pip install kafka-python
KafkaConsumer
*************
+:class:`~kafka.KafkaConsumer` is a high-level message consumer, intended to
+operate as similarly as possible to the official 0.9 java client. Full support
+for coordinated consumer groups requires use of kafka brokers that support the
+0.9 Group APIs.
+
+See `KafkaConsumer <apidoc/KafkaConsumer.html>`_ for API and configuration details.
+
+The consumer iterator returns ConsumerRecords, which are simple namedtuples
+that expose basic message attributes: topic, partition, offset, key, and value:
+
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
-:class:`~kafka.consumer.KafkaConsumer` is a full-featured,
-high-level message consumer class that is similar in design and function to the
-new 0.9 java consumer. Most configuration parameters defined by the official
-java client are supported as optional kwargs, with generally similar behavior.
-Gzip and Snappy compressed messages are supported transparently.
-
-In addition to the standard
-:meth:`~kafka.consumer.KafkaConsumer.poll` interface (which returns
-micro-batches of messages, grouped by topic-partition), kafka-python supports
-single-message iteration, yielding :class:`~kafka.consumer.ConsumerRecord`
-namedtuples, which include the topic, partition, offset, key, and value of each
-message.
+>>> # manually assign the partition list for the consumer
+>>> from kafka import TopicPartition
+>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
+>>> consumer.assign([TopicPartition('foobar', 2)])
+>>> msg = next(consumer)
-By default, :class:`~kafka.consumer.KafkaConsumer` will attempt to auto-commit
-message offsets every 5 seconds. When used with 0.9 kafka brokers,
-:class:`~kafka.consumer.KafkaConsumer` will dynamically assign partitions using
-the kafka GroupCoordinator APIs and a
-:class:`~kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor`
-partitioning strategy, enabling relatively straightforward parallel consumption
-patterns. See :doc:`usage` for examples.
+>>> # Deserialize msgpack-encoded values
+>>> consumer = KafkaConsumer(value_deserializer=msgpack.dumps)
+>>> consumer.subscribe(['msgpackfoo'])
+>>> for msg in consumer:
+... msg = next(consumer)
+... assert isinstance(msg.value, dict)
KafkaProducer
*************
-TBD
+:class:`~kafka.KafkaProducer` is a high-level, asynchronous message producer.
+The class is intended to operate as similarly as possible to the official java
+client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
+
+>>> from kafka import KafkaProducer
+>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
+>>> producer.send('foobar', b'some_message_bytes')
+
+>>> # Blocking send
+>>> producer.send('foobar', b'another_message').get(timeout=60)
+
+>>> # Use a key for hashed-partitioning
+>>> producer.send('foobar', key=b'foo', value=b'bar')
+
+>>> # Serialize json messages
+>>> import json
+>>> producer = KafkaProducer(value_serializer=json.loads)
+>>> producer.send('fizzbuzz', {'foo': 'bar'})
+
+>>> # Serialize string keys
+>>> producer = KafkaProducer(key_serializer=str.encode)
+>>> producer.send('flipflap', key='ping', value=b'1234')
+
+>>> # Compress messages
+>>> producer = KafkaProducer(compression_type='gzip')
+>>> for i in range(1000):
+... producer.send('foobar', b'msg %d' % i)
+
+
+Compression
+***********
+
+kafka-python supports gzip compression/decompression natively. To produce or
+consume snappy and lz4 compressed messages, you must install lz4 (lz4-cffi
+if using pypy) and/or python-snappy (also requires snappy library).
+See `Installation <install.html#optional-snappy-install>`_ for more information.
Protocol
@@ -78,6 +129,7 @@ SimpleConsumer and SimpleProducer.
:maxdepth: 2
Usage Overview <usage>
+ Simple Clients [deprecated] <simple>
API </apidoc/modules>
install
tests
diff --git a/docs/simple.rst b/docs/simple.rst
new file mode 100644
index 0000000..00a21ac
--- /dev/null
+++ b/docs/simple.rst
@@ -0,0 +1,144 @@
+Simple APIs (DEPRECATED)
+************************
+
+
+SimpleConsumer
+==============
+
+.. code:: python
+
+ from kafka import SimpleProducer, SimpleClient
+
+ # To consume messages
+ client = SimpleClient('localhost:9092')
+ consumer = SimpleConsumer(client, "my-group", "my-topic")
+ for message in consumer:
+ # message is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.decode('utf-8')`
+ print(message)
+
+
+ # Use multiprocessing for parallel consumers
+ from kafka import MultiProcessConsumer
+
+ # This will split the number of partitions among two processes
+ consumer = MultiProcessConsumer(client, "my-group", "my-topic", num_procs=2)
+
+ # This will spawn processes such that each handles 2 partitions max
+ consumer = MultiProcessConsumer(client, "my-group", "my-topic",
+ partitions_per_proc=2)
+
+ for message in consumer:
+ print(message)
+
+ for message in consumer.get_messages(count=5, block=True, timeout=4):
+ print(message)
+
+ client.close()
+
+
+SimpleProducer
+==============
+
+Asynchronous Mode
+-----------------
+
+.. code:: python
+
+ from kafka import SimpleProducer, SimpleClient
+
+ # To send messages asynchronously
+ client = SimpleClient('localhost:9092')
+ producer = SimpleProducer(client, async=True)
+ producer.send_messages('my-topic', b'async message')
+
+ # To send messages in batch. You can use any of the available
+ # producers for doing this. The following producer will collect
+ # messages in batch and send them to Kafka after 20 messages are
+ # collected or every 60 seconds
+ # Notes:
+ # * If the producer dies before the messages are sent, there will be losses
+ # * Call producer.stop() to send the messages and cleanup
+ producer = SimpleProducer(client,
+ async=True,
+ batch_send_every_n=20,
+ batch_send_every_t=60)
+
+Synchronous Mode
+----------------
+
+.. code:: python
+
+ from kafka import SimpleProducer, SimpleClient
+
+ # To send messages synchronously
+ client = SimpleClient('localhost:9092')
+ producer = SimpleProducer(client, async=False)
+
+ # Note that the application is responsible for encoding messages to type bytes
+ producer.send_messages('my-topic', b'some message')
+ producer.send_messages('my-topic', b'this method', b'is variadic')
+
+ # Send unicode message
+ producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
+
+ # To wait for acknowledgements
+ # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
+ # a local log before sending response
+ # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
+ # by all in sync replicas before sending a response
+ producer = SimpleProducer(client,
+ async=False,
+ req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=2000,
+ sync_fail_on_error=False)
+
+ responses = producer.send_messages('my-topic', b'another message')
+ for r in responses:
+ logging.info(r.offset)
+
+
+KeyedProducer
+=============
+
+.. code:: python
+
+ from kafka import (
+ SimpleClient, KeyedProducer,
+ Murmur2Partitioner, RoundRobinPartitioner)
+
+ kafka = SimpleClient('localhost:9092')
+
+ # HashedPartitioner is default (currently uses python hash())
+ producer = KeyedProducer(kafka)
+ producer.send_messages(b'my-topic', b'key1', b'some message')
+ producer.send_messages(b'my-topic', b'key2', b'this methode')
+
+ # Murmur2Partitioner attempts to mirror the java client hashing
+ producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner)
+
+ # Or just produce round-robin (or just use SimpleProducer)
+ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+
+
+SimpleClient
+============
+
+
+.. code:: python
+
+ from kafka import SimpleClient, create_message
+ from kafka.protocol import KafkaProtocol
+ from kafka.common import ProduceRequest
+
+ kafka = SimpleClient("localhost:9092")
+
+ req = ProduceRequest(topic="my-topic", partition=1,
+ messages=[create_message("some message")])
+ resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
+ kafka.close()
+
+ resps[0].topic # "my-topic"
+ resps[0].partition # 1
+ resps[0].error # 0 (hopefully)
+ resps[0].offset # offset of the first message sent in this request
diff --git a/docs/usage.rst b/docs/usage.rst
index e74e5af..f2bea06 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -50,85 +50,41 @@ There are many configuration options for the consumer class. See
:class:`~kafka.KafkaConsumer` API documentation for more details.
-SimpleProducer
+KafkaProducer
==============
-Asynchronous Mode
------------------
-
-.. code:: python
-
- from kafka import SimpleProducer, SimpleClient
-
- # To send messages asynchronously
- client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async=True)
- producer.send_messages('my-topic', b'async message')
-
- # To send messages in batch. You can use any of the available
- # producers for doing this. The following producer will collect
- # messages in batch and send them to Kafka after 20 messages are
- # collected or every 60 seconds
- # Notes:
- # * If the producer dies before the messages are sent, there will be losses
- # * Call producer.stop() to send the messages and cleanup
- producer = SimpleProducer(client,
- async=True,
- batch_send_every_n=20,
- batch_send_every_t=60)
-
-Synchronous Mode
-----------------
-
.. code:: python
- from kafka import SimpleProducer, SimpleClient
+ from kafka import KafkaProducer
- # To send messages synchronously
- client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async=False)
+ producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
- # Note that the application is responsible for encoding messages to type bytes
- producer.send_messages('my-topic', b'some message')
- producer.send_messages('my-topic', b'this method', b'is variadic')
+ # Asynchronous by default
+ future = producer.send('my-topic', b'raw_bytes')
- # Send unicode message
- producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
-
- # To wait for acknowledgements
- # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
- # a local log before sending response
- # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
- # by all in sync replicas before sending a response
- producer = SimpleProducer(client,
- async=False,
- req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000,
- sync_fail_on_error=False)
-
- responses = producer.send_messages('my-topic', b'another message')
- for r in responses:
- logging.info(r.offset)
-
-
-KeyedProducer
-=============
-
-.. code:: python
+ # Block for 'synchronous' sends
+ try:
+ record_metadata = future.get(timeout=10)
+ except KafkaError:
+ # Decide what to do if produce request failed...
+ log.exception()
+ pass
- from kafka import (
- SimpleClient, KeyedProducer,
- Murmur2Partitioner, RoundRobinPartitioner)
+ # Successful result returns assigned partition and offset
+ print (record_metadata.topic)
+ print (record_metadata.partition)
+ print (record_metadata.offset)
- kafka = SimpleClient('localhost:9092')
+ # produce keyed messages to enable hashed partitioning
+ producer.send('my-topic', key=b'foo', value=b'bar')
- # HashedPartitioner is default (currently uses python hash())
- producer = KeyedProducer(kafka)
- producer.send_messages(b'my-topic', b'key1', b'some message')
- producer.send_messages(b'my-topic', b'key2', b'this methode')
+ # encode objects via msgpack
+ producer = KafkaProducer(value_serializer=msgpack.dumps)
+ producer.send('msgpack-topic', {'key': 'value'})
- # Murmur2Partitioner attempts to mirror the java client hashing
- producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner)
+ # produce json messages
+ producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
+ producer.send('json-topic', {'key': 'value'})
- # Or just produce round-robin (or just use SimpleProducer)
- producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+ # configure multiple retries
+ producer = KafkaProducer(retries=5)