diff options
-rw-r--r-- | README.rst | 63 | ||||
-rw-r--r-- | docs/index.rst | 102 | ||||
-rw-r--r-- | docs/simple.rst | 144 | ||||
-rw-r--r-- | docs/usage.rst | 96 |
4 files changed, 288 insertions, 117 deletions
@@ -12,39 +12,58 @@ Kafka Python client .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE ->>> pip install kafka-python +Python client for the Apache Kafka distributed stream processing system. +kafka-python is designed to function much like the official java client, with a +sprinkling of pythonic interfaces (e.g., consumer iterators). + +kafka-python is best used with 0.9 brokers, but is backwards-compatible with +older versions (to 0.8.0). Some features will only be enabled on newer brokers, +however; for example, fully coordinated consumer groups -- i.e., dynamic partition +assignment to multiple consumers in the same group -- requires use of 0.9 kafka +brokers. Supporting this feature for earlier broker releases would require +writing and maintaining custom leadership election and membership / health +check code (perhaps using zookeeper or consul). For older brokers, you can +achieve something similar by manually assigning different partitions to each +consumer instance with config management tools like chef, ansible, etc. This +approach will work fine, though it does not support rebalancing on failures. +See `Compatibility <http://kafka-python.readthedocs.org/en/master/compatibility.html>`_ +for more details. -kafka-python is a client for the Apache Kafka distributed stream processing -system. It is designed to function much like the official java client, with a -sprinkling of pythonic interfaces (e.g., iterators). +Please note that the master branch may contain unreleased features. For release +documentation, please see readthedocs and/or python's inline help. +>>> pip install kafka-python KafkaConsumer ************* +KafkaConsumer is a high-level message consumer, intended to operate as similarly +as possible to the official 0.9 java client. Full support for coordinated +consumer groups requires use of kafka brokers that support the 0.9 Group APIs. + +See `ReadTheDocs <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html>`_ +for API and configuration details. + +The consumer iterator returns ConsumerRecords, which are simple namedtuples +that expose basic message attributes: topic, partition, offset, key, and value: + >>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic') >>> for msg in consumer: ... print (msg) -KafkaConsumer is a full-featured, -high-level message consumer class that is similar in design and function to the -new 0.9 java consumer. Most configuration parameters defined by the official -java client are supported as optional kwargs, with generally similar behavior. -Gzip and Snappy compressed messages are supported transparently. - -In addition to the standard KafkaConsumer.poll() interface (which returns -micro-batches of messages, grouped by topic-partition), kafka-python supports -single-message iteration, yielding ConsumerRecord namedtuples, which include -the topic, partition, offset, key, and value of each message. - -By default, KafkaConsumer will attempt to auto-commit -message offsets every 5 seconds. When used with 0.9 kafka brokers, -KafkaConsumer will dynamically assign partitions using -the kafka GroupCoordinator APIs and a RoundRobinPartitionAssignor -partitioning strategy, enabling relatively straightforward parallel consumption -patterns. See `ReadTheDocs <http://kafka-python.readthedocs.org/master/>`_ -for examples. +>>> # manually assign the partition list for the consumer +>>> from kafka import TopicPartition +>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') +>>> consumer.assign([TopicPartition('foobar', 2)]) +>>> msg = next(consumer) + +>>> # Deserialize msgpack-encoded values +>>> consumer = KafkaConsumer(value_deserializer=msgpack.dumps) +>>> consumer.subscribe(['msgpackfoo']) +>>> for msg in consumer: +... msg = next(consumer) +... assert isinstance(msg.value, dict) KafkaProducer diff --git a/docs/index.rst b/docs/index.rst index f65d4db..2f54b09 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,47 +12,98 @@ kafka-python .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE ->>> pip install kafka-python - -kafka-python is a client for the Apache Kafka distributed stream processing -system. It is designed to function much like the official java client, with a -sprinkling of pythonic interfaces (e.g., iterators). +Python client for the Apache Kafka distributed stream processing system. +kafka-python is designed to function much like the official java client, with a +sprinkling of pythonic interfaces (e.g., consumer iterators). + +kafka-python is best used with 0.9 brokers, but is backwards-compatible with +older versions (to 0.8.0). Some features will only be enabled on newer brokers, +however; for example, fully coordinated consumer groups -- i.e., dynamic +partition assignment to multiple consumers in the same group -- requires use of +0.9 kafka brokers. Supporting this feature for earlier broker releases would +require writing and maintaining custom leadership election and membership / +health check code (perhaps using zookeeper or consul). For older brokers, you +can achieve something similar by manually assigning different partitions to +each consumer instance with config management tools like chef, ansible, etc. +This approach will work fine, though it does not support rebalancing on +failures. See `Compatibility <compatibility.html>`_ for more details. + +Please note that the master branch may contain unreleased features. For release +documentation, please see readthedocs and/or python's inline help. +>>> pip install kafka-python KafkaConsumer ************* +:class:`~kafka.KafkaConsumer` is a high-level message consumer, intended to +operate as similarly as possible to the official 0.9 java client. Full support +for coordinated consumer groups requires use of kafka brokers that support the +0.9 Group APIs. + +See `KafkaConsumer <apidoc/KafkaConsumer.html>`_ for API and configuration details. + +The consumer iterator returns ConsumerRecords, which are simple namedtuples +that expose basic message attributes: topic, partition, offset, key, and value: + >>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic') >>> for msg in consumer: ... print (msg) -:class:`~kafka.consumer.KafkaConsumer` is a full-featured, -high-level message consumer class that is similar in design and function to the -new 0.9 java consumer. Most configuration parameters defined by the official -java client are supported as optional kwargs, with generally similar behavior. -Gzip and Snappy compressed messages are supported transparently. - -In addition to the standard -:meth:`~kafka.consumer.KafkaConsumer.poll` interface (which returns -micro-batches of messages, grouped by topic-partition), kafka-python supports -single-message iteration, yielding :class:`~kafka.consumer.ConsumerRecord` -namedtuples, which include the topic, partition, offset, key, and value of each -message. +>>> # manually assign the partition list for the consumer +>>> from kafka import TopicPartition +>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') +>>> consumer.assign([TopicPartition('foobar', 2)]) +>>> msg = next(consumer) -By default, :class:`~kafka.consumer.KafkaConsumer` will attempt to auto-commit -message offsets every 5 seconds. When used with 0.9 kafka brokers, -:class:`~kafka.consumer.KafkaConsumer` will dynamically assign partitions using -the kafka GroupCoordinator APIs and a -:class:`~kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor` -partitioning strategy, enabling relatively straightforward parallel consumption -patterns. See :doc:`usage` for examples. +>>> # Deserialize msgpack-encoded values +>>> consumer = KafkaConsumer(value_deserializer=msgpack.dumps) +>>> consumer.subscribe(['msgpackfoo']) +>>> for msg in consumer: +... msg = next(consumer) +... assert isinstance(msg.value, dict) KafkaProducer ************* -TBD +:class:`~kafka.KafkaProducer` is a high-level, asynchronous message producer. +The class is intended to operate as similarly as possible to the official java +client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details. + +>>> from kafka import KafkaProducer +>>> producer = KafkaProducer(bootstrap_servers='localhost:1234') +>>> producer.send('foobar', b'some_message_bytes') + +>>> # Blocking send +>>> producer.send('foobar', b'another_message').get(timeout=60) + +>>> # Use a key for hashed-partitioning +>>> producer.send('foobar', key=b'foo', value=b'bar') + +>>> # Serialize json messages +>>> import json +>>> producer = KafkaProducer(value_serializer=json.loads) +>>> producer.send('fizzbuzz', {'foo': 'bar'}) + +>>> # Serialize string keys +>>> producer = KafkaProducer(key_serializer=str.encode) +>>> producer.send('flipflap', key='ping', value=b'1234') + +>>> # Compress messages +>>> producer = KafkaProducer(compression_type='gzip') +>>> for i in range(1000): +... producer.send('foobar', b'msg %d' % i) + + +Compression +*********** + +kafka-python supports gzip compression/decompression natively. To produce or +consume snappy and lz4 compressed messages, you must install lz4 (lz4-cffi +if using pypy) and/or python-snappy (also requires snappy library). +See `Installation <install.html#optional-snappy-install>`_ for more information. Protocol @@ -78,6 +129,7 @@ SimpleConsumer and SimpleProducer. :maxdepth: 2 Usage Overview <usage> + Simple Clients [deprecated] <simple> API </apidoc/modules> install tests diff --git a/docs/simple.rst b/docs/simple.rst new file mode 100644 index 0000000..00a21ac --- /dev/null +++ b/docs/simple.rst @@ -0,0 +1,144 @@ +Simple APIs (DEPRECATED) +************************ + + +SimpleConsumer +============== + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To consume messages + client = SimpleClient('localhost:9092') + consumer = SimpleConsumer(client, "my-group", "my-topic") + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + + # Use multiprocessing for parallel consumers + from kafka import MultiProcessConsumer + + # This will split the number of partitions among two processes + consumer = MultiProcessConsumer(client, "my-group", "my-topic", num_procs=2) + + # This will spawn processes such that each handles 2 partitions max + consumer = MultiProcessConsumer(client, "my-group", "my-topic", + partitions_per_proc=2) + + for message in consumer: + print(message) + + for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + + client.close() + + +SimpleProducer +============== + +Asynchronous Mode +----------------- + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To send messages asynchronously + client = SimpleClient('localhost:9092') + producer = SimpleProducer(client, async=True) + producer.send_messages('my-topic', b'async message') + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(client, + async=True, + batch_send_every_n=20, + batch_send_every_t=60) + +Synchronous Mode +---------------- + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To send messages synchronously + client = SimpleClient('localhost:9092') + producer = SimpleProducer(client, async=False) + + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages('my-topic', b'some message') + producer.send_messages('my-topic', b'this method', b'is variadic') + + # Send unicode message + producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8')) + + # To wait for acknowledgements + # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to + # a local log before sending response + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(client, + async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000, + sync_fail_on_error=False) + + responses = producer.send_messages('my-topic', b'another message') + for r in responses: + logging.info(r.offset) + + +KeyedProducer +============= + +.. code:: python + + from kafka import ( + SimpleClient, KeyedProducer, + Murmur2Partitioner, RoundRobinPartitioner) + + kafka = SimpleClient('localhost:9092') + + # HashedPartitioner is default (currently uses python hash()) + producer = KeyedProducer(kafka) + producer.send_messages(b'my-topic', b'key1', b'some message') + producer.send_messages(b'my-topic', b'key2', b'this methode') + + # Murmur2Partitioner attempts to mirror the java client hashing + producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) + + # Or just produce round-robin (or just use SimpleProducer) + producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + + +SimpleClient +============ + + +.. code:: python + + from kafka import SimpleClient, create_message + from kafka.protocol import KafkaProtocol + from kafka.common import ProduceRequest + + kafka = SimpleClient("localhost:9092") + + req = ProduceRequest(topic="my-topic", partition=1, + messages=[create_message("some message")]) + resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) + kafka.close() + + resps[0].topic # "my-topic" + resps[0].partition # 1 + resps[0].error # 0 (hopefully) + resps[0].offset # offset of the first message sent in this request diff --git a/docs/usage.rst b/docs/usage.rst index e74e5af..f2bea06 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -50,85 +50,41 @@ There are many configuration options for the consumer class. See :class:`~kafka.KafkaConsumer` API documentation for more details. -SimpleProducer +KafkaProducer ============== -Asynchronous Mode ------------------ - -.. code:: python - - from kafka import SimpleProducer, SimpleClient - - # To send messages asynchronously - client = SimpleClient('localhost:9092') - producer = SimpleProducer(client, async=True) - producer.send_messages('my-topic', b'async message') - - # To send messages in batch. You can use any of the available - # producers for doing this. The following producer will collect - # messages in batch and send them to Kafka after 20 messages are - # collected or every 60 seconds - # Notes: - # * If the producer dies before the messages are sent, there will be losses - # * Call producer.stop() to send the messages and cleanup - producer = SimpleProducer(client, - async=True, - batch_send_every_n=20, - batch_send_every_t=60) - -Synchronous Mode ----------------- - .. code:: python - from kafka import SimpleProducer, SimpleClient + from kafka import KafkaProducer - # To send messages synchronously - client = SimpleClient('localhost:9092') - producer = SimpleProducer(client, async=False) + producer = KafkaProducer(bootstrap_servers=['broker1:1234']) - # Note that the application is responsible for encoding messages to type bytes - producer.send_messages('my-topic', b'some message') - producer.send_messages('my-topic', b'this method', b'is variadic') + # Asynchronous by default + future = producer.send('my-topic', b'raw_bytes') - # Send unicode message - producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8')) - - # To wait for acknowledgements - # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to - # a local log before sending response - # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed - # by all in sync replicas before sending a response - producer = SimpleProducer(client, - async=False, - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000, - sync_fail_on_error=False) - - responses = producer.send_messages('my-topic', b'another message') - for r in responses: - logging.info(r.offset) - - -KeyedProducer -============= - -.. code:: python + # Block for 'synchronous' sends + try: + record_metadata = future.get(timeout=10) + except KafkaError: + # Decide what to do if produce request failed... + log.exception() + pass - from kafka import ( - SimpleClient, KeyedProducer, - Murmur2Partitioner, RoundRobinPartitioner) + # Successful result returns assigned partition and offset + print (record_metadata.topic) + print (record_metadata.partition) + print (record_metadata.offset) - kafka = SimpleClient('localhost:9092') + # produce keyed messages to enable hashed partitioning + producer.send('my-topic', key=b'foo', value=b'bar') - # HashedPartitioner is default (currently uses python hash()) - producer = KeyedProducer(kafka) - producer.send_messages(b'my-topic', b'key1', b'some message') - producer.send_messages(b'my-topic', b'key2', b'this methode') + # encode objects via msgpack + producer = KafkaProducer(value_serializer=msgpack.dumps) + producer.send('msgpack-topic', {'key': 'value'}) - # Murmur2Partitioner attempts to mirror the java client hashing - producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) + # produce json messages + producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii')) + producer.send('json-topic', {'key': 'value'}) - # Or just produce round-robin (or just use SimpleProducer) - producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + # configure multiple retries + producer = KafkaProducer(retries=5) |