diff options
| author | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:14:49 -0800 | 
|---|---|---|
| committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:14:56 -0800 | 
| commit | d4e85ecd1d8acac1a0f74d164b67faefd99987e4 (patch) | |
| tree | 04d754bbd47230cd0c979926a0730750005d5e2d /docs/usage.rst | |
| parent | 2a2e77aa1e5c31b3e815d573051bb2019daaa306 (diff) | |
| download | kafka-python-d4e85ecd1d8acac1a0f74d164b67faefd99987e4.tar.gz | |
Update docs for release w/ new async classes
Diffstat (limited to 'docs/usage.rst')
| -rw-r--r-- | docs/usage.rst | 246 | 
1 files changed, 88 insertions, 158 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index 6417cd8..e74e5af 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -1,68 +1,126 @@  Usage -===== +***** -SimpleProducer --------------- + +KafkaConsumer +=============  .. code:: python -    from kafka import SimpleProducer, KafkaClient +    from kafka import KafkaConsumer -    # To send messages synchronously -    kafka = KafkaClient('localhost:9092') -    producer = SimpleProducer(kafka) +    # To consume latest messages and auto-commit offsets +    consumer = KafkaConsumer('my-topic', +                             group_id='my-group', +                             bootstrap_servers=['localhost:9092']) +    for message in consumer: +        # message value and key are raw bytes -- decode if necessary! +        # e.g., for unicode: `message.value.decode('utf-8')` +        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, +                                              message.offset, message.key, +                                              message.value)) -    # Note that the application is responsible for encoding messages to type bytes -    producer.send_messages(b'my-topic', b'some message') -    producer.send_messages(b'my-topic', b'this method', b'is variadic') +    # consume earliest available messages, dont commit offsets +    KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) -    # Send unicode message -    producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8')) +    # consume json messages +    KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii'))) + +    # consume msgpack  +    KafkaConsumer(value_deserializer=msgpack.unpackb) + +    # StopIteration if no message after 1sec +    KafkaConsumer(consumer_timeout_ms=1000) + +    # Subscribe to a regex topic pattern +    consumer = KafkaConsumer() +    consumer.subscribe(pattern='^awesome.*') + +    # Use multiple consumers in parallel w/ 0.9 kafka brokers +    # typically you would run each on a different server / process / CPU +    consumer1 = KafkaConsumer('my-topic', +                              group_id='my-group', +                              bootstrap_servers='my.server.com') +    consumer2 = KafkaConsumer('my-topic', +                              group_id='my-group', +                              bootstrap_servers='my.server.com') + + +There are many configuration options for the consumer class. See +:class:`~kafka.KafkaConsumer` API documentation for more details. + + +SimpleProducer +==============  Asynchronous Mode  -----------------  .. code:: python +    from kafka import SimpleProducer, SimpleClient +      # To send messages asynchronously -    producer = SimpleProducer(kafka, async=True) -    producer.send_messages(b'my-topic', b'async message') +    client = SimpleClient('localhost:9092') +    producer = SimpleProducer(client, async=True) +    producer.send_messages('my-topic', b'async message') + +    # To send messages in batch. You can use any of the available +    # producers for doing this. The following producer will collect +    # messages in batch and send them to Kafka after 20 messages are +    # collected or every 60 seconds +    # Notes: +    # * If the producer dies before the messages are sent, there will be losses +    # * Call producer.stop() to send the messages and cleanup +    producer = SimpleProducer(client, +                              async=True, +                              batch_send_every_n=20, +                              batch_send_every_t=60) + +Synchronous Mode +---------------- + +.. code:: python + +    from kafka import SimpleProducer, SimpleClient + +    # To send messages synchronously +    client = SimpleClient('localhost:9092') +    producer = SimpleProducer(client, async=False) + +    # Note that the application is responsible for encoding messages to type bytes +    producer.send_messages('my-topic', b'some message') +    producer.send_messages('my-topic', b'this method', b'is variadic') + +    # Send unicode message +    producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))      # To wait for acknowledgements      # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to      #                         a local log before sending response      # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed      #                            by all in sync replicas before sending a response -    producer = SimpleProducer(kafka, async=False, +    producer = SimpleProducer(client, +                              async=False,                                req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,                                ack_timeout=2000,                                sync_fail_on_error=False) -    responses = producer.send_messages(b'my-topic', b'another message') +    responses = producer.send_messages('my-topic', b'another message')      for r in responses:          logging.info(r.offset) -    # To send messages in batch. You can use any of the available -    # producers for doing this. The following producer will collect -    # messages in batch and send them to Kafka after 20 messages are -    # collected or every 60 seconds -    # Notes: -    # * If the producer dies before the messages are sent, there will be losses -    # * Call producer.stop() to send the messages and cleanup -    producer = SimpleProducer(kafka, async=True, -                              batch_send_every_n=20, -                              batch_send_every_t=60) -Keyed messages --------------- +KeyedProducer +=============  .. code:: python      from kafka import ( -        KafkaClient, KeyedProducer, +        SimpleClient, KeyedProducer,          Murmur2Partitioner, RoundRobinPartitioner) -    kafka = KafkaClient('localhost:9092') +    kafka = SimpleClient('localhost:9092')      # HashedPartitioner is default (currently uses python hash())      producer = KeyedProducer(kafka) @@ -74,131 +132,3 @@ Keyed messages      # Or just produce round-robin (or just use SimpleProducer)      producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) - - - -KafkaConsumer -------------- - -.. code:: python - -    from kafka import KafkaConsumer - -    # To consume messages -    consumer = KafkaConsumer('my-topic', -                             group_id='my_group', -                             bootstrap_servers=['localhost:9092']) -    for message in consumer: -        # message value is raw byte string -- decode if necessary! -        # e.g., for unicode: `message.value.decode('utf-8')` -        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, -                                             message.offset, message.key, -                                             message.value)) - - -messages (m) are namedtuples with attributes: - -  * `m.topic`: topic name (str) -  * `m.partition`: partition number (int) -  * `m.offset`: message offset on topic-partition log (int) -  * `m.key`: key (bytes - can be None) -  * `m.value`: message (output of deserializer_class - default is raw bytes) - - -.. code:: python - -    from kafka import KafkaConsumer - -    # more advanced consumer -- multiple topics w/ auto commit offset -    # management -    consumer = KafkaConsumer('topic1', 'topic2', -                             bootstrap_servers=['localhost:9092'], -                             group_id='my_consumer_group', -                             auto_commit_enable=True, -                             auto_commit_interval_ms=30 * 1000, -                             auto_offset_reset='smallest') - -    # Infinite iteration -    for m in consumer: -      do_some_work(m) - -      # Mark this message as fully consumed -      # so it can be included in the next commit -      # -      # **messages that are not marked w/ task_done currently do not commit! -      consumer.task_done(m) - -    # If auto_commit_enable is False, remember to commit() periodically -    consumer.commit() - -    # Batch process interface -    while True: -      for m in kafka.fetch_messages(): -        process_message(m) -        consumer.task_done(m) - - -  Configuration settings can be passed to constructor, -  otherwise defaults will be used: - -.. code:: python - -      client_id='kafka.consumer.kafka', -      group_id=None, -      fetch_message_max_bytes=1024*1024, -      fetch_min_bytes=1, -      fetch_wait_max_ms=100, -      refresh_leader_backoff_ms=200, -      bootstrap_servers=[], -      socket_timeout_ms=30*1000, -      auto_offset_reset='largest', -      deserializer_class=lambda msg: msg, -      auto_commit_enable=False, -      auto_commit_interval_ms=60 * 1000, -      consumer_timeout_ms=-1 - -  Configuration parameters are described in more detail at -  http://kafka.apache.org/documentation.html#highlevelconsumerapi - -Multiprocess consumer ---------------------- - -.. code:: python - -    from kafka import KafkaClient, MultiProcessConsumer - -    kafka = KafkaClient('localhost:9092') - -    # This will split the number of partitions among two processes -    consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2) - -    # This will spawn processes such that each handles 2 partitions max -    consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', -                                    partitions_per_proc=2) - -    for message in consumer: -        print(message) - -    for message in consumer.get_messages(count=5, block=True, timeout=4): -        print(message) - -Low level ---------- - -.. code:: python - -    from kafka import KafkaClient, create_message -    from kafka.protocol import KafkaProtocol -    from kafka.common import ProduceRequest - -    kafka = KafkaClient('localhost:9092') - -    req = ProduceRequest(topic=b'my-topic', partition=1, -        messages=[create_message(b'some message')]) -    resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) -    kafka.close() - -    resps[0].topic      # b'my-topic' -    resps[0].partition  # 1 -    resps[0].error      # 0 (hopefully) -    resps[0].offset     # offset of the first message sent in this request  | 
