diff options
44 files changed, 2071 insertions, 959 deletions
@@ -8,3 +8,4 @@ env servers/*/kafka-bin .coverage .noseids +docs/_build diff --git a/.travis.yml b/.travis.yml index f4fc66b..7184bc8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,13 +12,14 @@ env: - KAFKA_VERSION=0.8.0 - KAFKA_VERSION=0.8.1 - KAFKA_VERSION=0.8.1.1 + - KAFKA_VERSION=0.8.2.0 before_install: - sudo apt-get install libsnappy-dev - ./build_integration.sh install: - - pip install tox + - pip install tox coveralls - pip install . # Deal with issue on Travis builders re: multiprocessing.Queue :( # See https://github.com/travis-ci/travis-cookbooks/issues/155 @@ -38,3 +39,6 @@ deploy: script: - if [ -n "$UNIT_AND_LINT_ONLY" ]; then tox -e lint,`./travis_selector.sh $TRAVIS_PYTHON_VERSION`; else tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`; fi + +after_success: + - coveralls @@ -1,5 +1,26 @@ -# 0.9.3 (Unreleased) +# 0.9.3 (Feb 3, 2015) +* Add coveralls.io support (sontek PR 307) +* Fix python2.6 threading.Event bug in ReentrantTimer (dpkp PR 312) +* Add kafka 0.8.2.0 to travis integration tests (dpkp PR 310) +* Auto-convert topics to utf-8 bytes in Producer (sontek PR 306) +* Fix reference cycle between SimpleConsumer and ReentrantTimer (zhaopengzp PR 309) +* Add Sphinx API docs (wedaly PR 282) +* Handle additional error cases exposed by 0.8.2.0 kafka server (dpkp PR 295) +* Refactor error class management (alexcb PR 289) +* Expose KafkaConsumer in __all__ for easy imports (Dinoshauer PR 286) +* SimpleProducer starts on random partition by default (alexcb PR 288) +* Add keys to compressed messages (meandthewallaby PR 281) +* Add new high-level KafkaConsumer class based on java client api (dpkp PR 234) +* Add KeyedProducer.send_messages api (pubnub PR 277) +* Fix consumer pending() method (jettify PR 276) +* Update low-level demo in README (sunisdown PR 274) +* Include key in KeyedProducer messages (se7entyse7en PR 268) +* Fix SimpleConsumer timeout behavior in get_messages (dpkp PR 238) +* Fix error in consumer.py test against max_buffer_size (rthille/wizzat PR 225/242) +* Improve string concat performance on pypy / py3 (dpkp PR 233) +* Reorg directory layout for consumer/producer/partitioners (dpkp/wizzat PR 232/243) +* Add OffsetCommitContext (locationlabs PR 217) * Metadata Refactor (dpkp PR 223) * Add Python 3 support (brutasse/wizzat - PR 227) * Minor cleanups - imports / README / PyPI classifiers (dpkp - PR 221) @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2014 David Arthur + Copyright 2015 David Arthur Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md deleted file mode 100644 index 6655b92..0000000 --- a/README.md +++ /dev/null @@ -1,264 +0,0 @@ -# Kafka Python client - -[](https://travis-ci.org/mumrah/kafka-python) - -This module provides low-level protocol support for Apache Kafka as well as -high-level consumer and producer classes. Request batching is supported by the -protocol as well as broker-aware request routing. Gzip and Snappy compression -is also supported for message sets. - -http://kafka.apache.org/ - -On Freenode IRC at #kafka-python, as well as #apache-kafka - -For general discussion of kafka-client design and implementation (not python specific), -see https://groups.google.com/forum/m/#!forum/kafka-clients - -# License - -Copyright 2014, David Arthur under Apache License, v2.0. See `LICENSE` - -# Status - -The current stable version of this package is [**0.9.2**](https://github.com/mumrah/kafka-python/releases/tag/v0.9.2) and is compatible with - -Kafka broker versions -- 0.8.0 -- 0.8.1 -- 0.8.1.1 - -Python versions -- 2.6 (tested on 2.6.9) -- 2.7 (tested on 2.7.8) -- pypy (tested on pypy 2.3.1 / python 2.7.6) -- (Python 3.3 and 3.4 support has been added to trunk and will be available the next release) - -# Usage - -## High level - -```python -from kafka import KafkaClient, SimpleProducer, SimpleConsumer - -# To send messages synchronously -kafka = KafkaClient("localhost:9092") -producer = SimpleProducer(kafka) - -# Note that the application is responsible for encoding messages to type str -producer.send_messages("my-topic", "some message") -producer.send_messages("my-topic", "this method", "is variadic") - -# Send unicode message -producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) - -# To send messages asynchronously -# WARNING: current implementation does not guarantee message delivery on failure! -# messages can get dropped! Use at your own risk! Or help us improve with a PR! -producer = SimpleProducer(kafka, async=True) -producer.send_messages("my-topic", "async message") - -# To wait for acknowledgements -# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to -# a local log before sending response -# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed -# by all in sync replicas before sending a response -producer = SimpleProducer(kafka, async=False, - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000) - -response = producer.send_messages("my-topic", "another message") - -if response: - print(response[0].error) - print(response[0].offset) - -# To send messages in batch. You can use any of the available -# producers for doing this. The following producer will collect -# messages in batch and send them to Kafka after 20 messages are -# collected or every 60 seconds -# Notes: -# * If the producer dies before the messages are sent, there will be losses -# * Call producer.stop() to send the messages and cleanup -producer = SimpleProducer(kafka, batch_send=True, - batch_send_every_n=20, - batch_send_every_t=60) - -# To consume messages -consumer = SimpleConsumer(kafka, "my-group", "my-topic") -for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) - -kafka.close() -``` - -## Keyed messages -```python -from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner - -kafka = KafkaClient("localhost:9092") - -# HashedPartitioner is default -producer = KeyedProducer(kafka) -producer.send("my-topic", "key1", "some message") -producer.send("my-topic", "key2", "this methode") - -producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) -``` - -## Multiprocess consumer -```python -from kafka import KafkaClient, MultiProcessConsumer - -kafka = KafkaClient("localhost:9092") - -# This will split the number of partitions among two processes -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) - -# This will spawn processes such that each handles 2 partitions max -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", - partitions_per_proc=2) - -for message in consumer: - print(message) - -for message in consumer.get_messages(count=5, block=True, timeout=4): - print(message) -``` - -## Low level - -```python -from kafka import KafkaClient, create_message -from kafka.protocol import KafkaProtocol -from kafka.common import ProduceRequest - -kafka = KafkaClient("localhost:9092") - -req = ProduceRequest(topic="my-topic", partition=1, - messages=[create_message("some message")]) -resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) -kafka.close() - -resps[0].topic # "my-topic" -resps[0].partition # 1 -resps[0].error # 0 (hopefully) -resps[0].offset # offset of the first message sent in this request -``` - -# Install - -Install with your favorite package manager - -## Latest Release -Pip: - -```shell -pip install kafka-python -``` - -Releases are also listed at https://github.com/mumrah/kafka-python/releases - - -## Bleeding-Edge -```shell -git clone https://github.com/mumrah/kafka-python -pip install ./kafka-python -``` - -Setuptools: -```shell -git clone https://github.com/mumrah/kafka-python -easy_install ./kafka-python -``` - -Using `setup.py` directly: -```shell -git clone https://github.com/mumrah/kafka-python -cd kafka-python -python setup.py install -``` - -## Optional Snappy install - -### Install Development Libraries -Download and build Snappy from http://code.google.com/p/snappy/downloads/list - -Ubuntu: -```shell -apt-get install libsnappy-dev -``` - -OSX: -```shell -brew install snappy -``` - -From Source: -```shell -wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz -tar xzvf snappy-1.0.5.tar.gz -cd snappy-1.0.5 -./configure -make -sudo make install -``` - -### Install Python Module -Install the `python-snappy` module -```shell -pip install python-snappy -``` - -# Tests - -## Run the unit tests - -```shell -tox -``` - -## Run a subset of unit tests -```shell -# run protocol tests only -tox -- -v test.test_protocol -``` - -```shell -# test with pypy only -tox -e pypy -``` - -```shell -# Run only 1 test, and use python 2.7 -tox -e py27 -- -v --with-id --collect-only -# pick a test number from the list like #102 -tox -e py27 -- -v --with-id 102 -``` - -## Run the integration tests - -The integration tests will actually start up real local Zookeeper -instance and Kafka brokers, and send messages in using the client. - -First, get the kafka binaries for integration testing: -```shell -./build_integration.sh -``` -By default, the build_integration.sh script will download binary -distributions for all supported kafka versions. -To test against the latest source build, set KAFKA_VERSION=trunk -and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) -```shell -SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh -``` - -Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` -env variable to the server build you want to use for testing: -```shell -KAFKA_VERSION=0.8.0 tox -KAFKA_VERSION=0.8.1 tox -KAFKA_VERSION=0.8.1.1 tox -KAFKA_VERSION=trunk tox -``` diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..5405f92 --- /dev/null +++ b/README.rst @@ -0,0 +1,53 @@ +Kafka Python client +------------------------ +.. image:: https://api.travis-ci.org/mumrah/kafka-python.png?branch=master + :target: https://travis-ci.org/mumrah/kafka-python + :alt: Build Status + +.. image:: https://coveralls.io/repos/mumrah/kafka-python/badge.svg?branch=master + :target: https://coveralls.io/r/mumrah/kafka-python?branch=master + :alt: Coverage Status + +.. image:: https://readthedocs.org/projects/kafka-python/badge/?version=latest + :target: http://kafka-python.readthedocs.org/en/latest/ + :alt: Full documentation available on ReadTheDocs + +`Full documentation available on ReadTheDocs`_ + +This module provides low-level protocol support for Apache Kafka as well as +high-level consumer and producer classes. Request batching is supported by the +protocol as well as broker-aware request routing. Gzip and Snappy compression +is also supported for message sets. + +http://kafka.apache.org/ + +On Freenode IRC at #kafka-python, as well as #apache-kafka + +For general discussion of kafka-client design and implementation (not python specific), +see https://groups.google.com/forum/#!forum/kafka-clients + +License +---------- +Copyright 2015, David Arthur under Apache License, v2.0. See `LICENSE` + +Status +---------- +The current stable version of this package is `0.9.3`_ and is compatible with: + +Kafka broker versions + +- 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs] +- 0.8.1.1 +- 0.8.1 +- 0.8.0 + +Python versions + +- 2.6 (tested on 2.6.9) +- 2.7 (tested on 2.7.9) +- 3.3 (tested on 3.3.5) +- 3.4 (tested on 3.4.2) +- pypy (tested on pypy 2.4.0 / python 2.7.8) + +.. _Full documentation available on ReadTheDocs: http://kafka-python.readthedocs.org/en/latest/ +.. _0.9.3: https://github.com/mumrah/kafka-python/releases/tag/v0.9.3 @@ -1 +1 @@ -0.9.3-dev +0.9.4-dev diff --git a/build_integration.sh b/build_integration.sh index bb46e54..2b81745 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,11 +1,11 @@ #!/bin/bash # Versions available for testing via binary distributions -OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1" +OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1 0.8.2.0" # Useful configuration vars, with sensible defaults if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.8.0 + SCALA_VERSION=2.10 fi # On travis CI, empty KAFKA_VERSION means skip integration tests @@ -45,12 +45,18 @@ pushd servers echo "-------------------------------------" echo "Checking kafka binaries for ${kafka}" echo - wget -N https://archive.apache.org/dist/kafka/$kafka/kafka_${SCALA_VERSION}-${kafka}.tgz || wget -N https://archive.apache.org/dist/kafka/$kafka/kafka_${SCALA_VERSION}-${kafka}.tar.gz + # kafka 0.8.0 is only available w/ scala 2.8.0 + if [ "$kafka" == "0.8.0" ]; then + KAFKA_ARTIFACT="kafka_2.8.0-${kafka}" + else + KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}" + fi + wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tgz || wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tar.gz echo if [ ! -d "../$kafka/kafka-bin" ]; then echo "Extracting kafka binaries for ${kafka}" - tar xzvf kafka_${SCALA_VERSION}-${kafka}.t* -C ../$kafka/ - mv ../$kafka/kafka_${SCALA_VERSION}-${kafka} ../$kafka/kafka-bin + tar xzvf ${KAFKA_ARTIFACT}.t* -C ../$kafka/ + mv ../$kafka/${KAFKA_ARTIFACT} ../$kafka/kafka-bin else echo "$kafka/kafka-bin directory already exists -- skipping tgz extraction" fi diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..5751f68 --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,177 @@ +# Makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +PAPER = +BUILDDIR = _build + +# User-friendly check for sphinx-build +ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) +$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/) +endif + +# Internal variables. +PAPEROPT_a4 = -D latex_paper_size=a4 +PAPEROPT_letter = -D latex_paper_size=letter +ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# the i18n builder cannot share the environment and doctrees with the others +I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . + +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext + +help: + @echo "Please use \`make <target>' where <target> is one of" + @echo " html to make standalone HTML files" + @echo " dirhtml to make HTML files named index.html in directories" + @echo " singlehtml to make a single large HTML file" + @echo " pickle to make pickle files" + @echo " json to make JSON files" + @echo " htmlhelp to make HTML files and a HTML help project" + @echo " qthelp to make HTML files and a qthelp project" + @echo " devhelp to make HTML files and a Devhelp project" + @echo " epub to make an epub" + @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" + @echo " latexpdf to make LaTeX files and run them through pdflatex" + @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" + @echo " text to make text files" + @echo " man to make manual pages" + @echo " texinfo to make Texinfo files" + @echo " info to make Texinfo files and run them through makeinfo" + @echo " gettext to make PO message catalogs" + @echo " changes to make an overview of all changed/added/deprecated items" + @echo " xml to make Docutils-native XML files" + @echo " pseudoxml to make pseudoxml-XML files for display purposes" + @echo " linkcheck to check all external links for integrity" + @echo " doctest to run all doctests embedded in the documentation (if enabled)" + +clean: + rm -rf $(BUILDDIR)/* + +html: + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." + +dirhtml: + $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." + +singlehtml: + $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml + @echo + @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." + +pickle: + $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle + @echo + @echo "Build finished; now you can process the pickle files." + +json: + $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json + @echo + @echo "Build finished; now you can process the JSON files." + +htmlhelp: + $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp + @echo + @echo "Build finished; now you can run HTML Help Workshop with the" \ + ".hhp project file in $(BUILDDIR)/htmlhelp." + +qthelp: + $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp + @echo + @echo "Build finished; now you can run "qcollectiongenerator" with the" \ + ".qhcp project file in $(BUILDDIR)/qthelp, like this:" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp" + @echo "To view the help file:" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc" + +devhelp: + $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp + @echo + @echo "Build finished." + @echo "To view the help file:" + @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python" + @echo "# devhelp" + +epub: + $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub + @echo + @echo "Build finished. The epub file is in $(BUILDDIR)/epub." + +latex: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo + @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." + @echo "Run \`make' in that directory to run these through (pdf)latex" \ + "(use \`make latexpdf' here to do that automatically)." + +latexpdf: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through pdflatex..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +latexpdfja: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through platex and dvipdfmx..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +text: + $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text + @echo + @echo "Build finished. The text files are in $(BUILDDIR)/text." + +man: + $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man + @echo + @echo "Build finished. The manual pages are in $(BUILDDIR)/man." + +texinfo: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo + @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." + @echo "Run \`make' in that directory to run these through makeinfo" \ + "(use \`make info' here to do that automatically)." + +info: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo "Running Texinfo files through makeinfo..." + make -C $(BUILDDIR)/texinfo info + @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." + +gettext: + $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale + @echo + @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." + +changes: + $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes + @echo + @echo "The overview file is in $(BUILDDIR)/changes." + +linkcheck: + $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck + @echo + @echo "Link check complete; look for any errors in the above output " \ + "or in $(BUILDDIR)/linkcheck/output.txt." + +doctest: + $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest + @echo "Testing of doctests in the sources finished, look at the " \ + "results in $(BUILDDIR)/doctest/output.txt." + +xml: + $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml + @echo + @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + +pseudoxml: + $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml + @echo + @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." diff --git a/docs/apidoc/kafka.consumer.rst b/docs/apidoc/kafka.consumer.rst new file mode 100644 index 0000000..8595f99 --- /dev/null +++ b/docs/apidoc/kafka.consumer.rst @@ -0,0 +1,46 @@ +kafka.consumer package +====================== + +Submodules +---------- + +kafka.consumer.base module +-------------------------- + +.. automodule:: kafka.consumer.base + :members: + :undoc-members: + :show-inheritance: + +kafka.consumer.kafka module +--------------------------- + +.. automodule:: kafka.consumer.kafka + :members: + :undoc-members: + :show-inheritance: + +kafka.consumer.multiprocess module +---------------------------------- + +.. automodule:: kafka.consumer.multiprocess + :members: + :undoc-members: + :show-inheritance: + +kafka.consumer.simple module +---------------------------- + +.. automodule:: kafka.consumer.simple + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: kafka.consumer + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/apidoc/kafka.partitioner.rst b/docs/apidoc/kafka.partitioner.rst new file mode 100644 index 0000000..ea215f1 --- /dev/null +++ b/docs/apidoc/kafka.partitioner.rst @@ -0,0 +1,38 @@ +kafka.partitioner package +========================= + +Submodules +---------- + +kafka.partitioner.base module +----------------------------- + +.. automodule:: kafka.partitioner.base + :members: + :undoc-members: + :show-inheritance: + +kafka.partitioner.hashed module +------------------------------- + +.. automodule:: kafka.partitioner.hashed + :members: + :undoc-members: + :show-inheritance: + +kafka.partitioner.roundrobin module +----------------------------------- + +.. automodule:: kafka.partitioner.roundrobin + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: kafka.partitioner + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/apidoc/kafka.producer.rst b/docs/apidoc/kafka.producer.rst new file mode 100644 index 0000000..bd850bb --- /dev/null +++ b/docs/apidoc/kafka.producer.rst @@ -0,0 +1,38 @@ +kafka.producer package +====================== + +Submodules +---------- + +kafka.producer.base module +-------------------------- + +.. automodule:: kafka.producer.base + :members: + :undoc-members: + :show-inheritance: + +kafka.producer.keyed module +--------------------------- + +.. automodule:: kafka.producer.keyed + :members: + :undoc-members: + :show-inheritance: + +kafka.producer.simple module +---------------------------- + +.. automodule:: kafka.producer.simple + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: kafka.producer + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/apidoc/kafka.rst b/docs/apidoc/kafka.rst new file mode 100644 index 0000000..eb04c35 --- /dev/null +++ b/docs/apidoc/kafka.rst @@ -0,0 +1,79 @@ +kafka package +============= + +Subpackages +----------- + +.. toctree:: + + kafka.consumer + kafka.partitioner + kafka.producer + +Submodules +---------- + +kafka.client module +------------------- + +.. automodule:: kafka.client + :members: + :undoc-members: + :show-inheritance: + +kafka.codec module +------------------ + +.. automodule:: kafka.codec + :members: + :undoc-members: + :show-inheritance: + +kafka.common module +------------------- + +.. automodule:: kafka.common + :members: + :undoc-members: + :show-inheritance: + +kafka.conn module +----------------- + +.. automodule:: kafka.conn + :members: + :undoc-members: + :show-inheritance: + +kafka.context module +-------------------- + +.. automodule:: kafka.context + :members: + :undoc-members: + :show-inheritance: + +kafka.protocol module +--------------------- + +.. automodule:: kafka.protocol + :members: + :undoc-members: + :show-inheritance: + +kafka.util module +----------------- + +.. automodule:: kafka.util + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: kafka + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst new file mode 100644 index 0000000..db3e580 --- /dev/null +++ b/docs/apidoc/modules.rst @@ -0,0 +1,7 @@ +kafka +===== + +.. toctree:: + :maxdepth: 4 + + kafka diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..ea223c2 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- +# +# kafka-python documentation build configuration file, created by +# sphinx-quickstart on Sun Jan 4 12:21:50 2015. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys +import os + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +#sys.path.insert(0, os.path.abspath('.')) + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.intersphinx', + 'sphinx.ext.viewcode', + 'sphinxcontrib.napoleon', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'kafka-python' +copyright = u'2015, David Arthur' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +with open('../VERSION') as version_file: + version = version_file.read() + +# The full version, including alpha/beta/rc tags. +release = version + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built documents. +#keep_warnings = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# "<project> v<release> documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Add any extra paths that contain custom files (such as robots.txt or +# .htaccess) here, relative to this directory. These files are copied +# directly to the root of the documentation. +#html_extra_path = [] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a <link> tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'kafka-pythondoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { +# The paper size ('letterpaper' or 'a4paper'). +#'papersize': 'letterpaper', + +# The font size ('10pt', '11pt' or '12pt'). +#'pointsize': '10pt', + +# Additional stuff for the LaTeX preamble. +#'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + ('index', 'kafka-python.tex', u'kafka-python Documentation', + u'David Arthur', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'kafka-python', u'kafka-python Documentation', + [u'David Arthur'], 1) +] + +# If true, show URL addresses after external links. +#man_show_urls = False + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'kafka-python', u'kafka-python Documentation', + u'David Arthur', 'kafka-python', 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +#texinfo_appendices = [] + +# If false, no module index is generated. +#texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +#texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +#texinfo_no_detailmenu = False + +on_rtd = os.environ.get('READTHEDOCS', None) == 'True' + +if not on_rtd: # only import and set the theme if we're building docs locally + import sphinx_rtd_theme + html_theme = 'sphinx_rtd_theme' + html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..e4a9ac7 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,58 @@ +kafka-python +============ + +This module provides low-level protocol support for Apache Kafka as well as +high-level consumer and producer classes. Request batching is supported by the +protocol as well as broker-aware request routing. Gzip and Snappy compression +is also supported for message sets. + +http://kafka.apache.org/ + +On Freenode IRC at #kafka-python, as well as #apache-kafka + +For general discussion of kafka-client design and implementation (not python specific), +see https://groups.google.com/forum/m/#!forum/kafka-clients + +Status +------ + +The current stable version of this package is `0.9.3 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.3>`_ and is compatible with: + +Kafka broker versions + +* 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs] +* 0.8.1.1 +* 0.8.1 +* 0.8.0 + +Python versions + +* 2.6 (tested on 2.6.9) +* 2.7 (tested on 2.7.9) +* 3.3 (tested on 3.3.5) +* 3.4 (tested on 3.4.2) +* pypy (tested on pypy 2.4.0 / python 2.7.8) + +License +------- + +Copyright 2015, David Arthur under Apache License, v2.0. See `LICENSE <https://github.com/mumrah/kafka-python/blob/master/LICENSE>`_. + + +Contents +-------- + +.. toctree:: + :maxdepth: 2 + + install + tests + usage + API reference </apidoc/modules> + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/install.rst b/docs/install.rst new file mode 100644 index 0000000..1dd6d4e --- /dev/null +++ b/docs/install.rst @@ -0,0 +1,79 @@ +Install +======= + +Install with your favorite package manager + +Latest Release +-------------- +Pip: + +.. code:: bash + + pip install kafka-python + +Releases are also listed at https://github.com/mumrah/kafka-python/releases + + +Bleeding-Edge +------------- + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + pip install ./kafka-python + +Setuptools: + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + easy_install ./kafka-python + +Using `setup.py` directly: + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + cd kafka-python + python setup.py install + + +Optional Snappy install +----------------------- + +Install Development Libraries +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Download and build Snappy from http://code.google.com/p/snappy/downloads/list + +Ubuntu: + +.. code:: bash + + apt-get install libsnappy-dev + +OSX: + +.. code:: bash + + brew install snappy + +From Source: + +.. code:: bash + + wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz + tar xzvf snappy-1.0.5.tar.gz + cd snappy-1.0.5 + ./configure + make + sudo make install + +Install Python Module +^^^^^^^^^^^^^^^^^^^^^ + +Install the `python-snappy` module + +.. code:: bash + + pip install python-snappy diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..2e9d7dc --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,242 @@ +@ECHO OFF
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set BUILDDIR=_build
+set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
+set I18NSPHINXOPTS=%SPHINXOPTS% .
+if NOT "%PAPER%" == "" (
+ set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
+ set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS%
+)
+
+if "%1" == "" goto help
+
+if "%1" == "help" (
+ :help
+ echo.Please use `make ^<target^>` where ^<target^> is one of
+ echo. html to make standalone HTML files
+ echo. dirhtml to make HTML files named index.html in directories
+ echo. singlehtml to make a single large HTML file
+ echo. pickle to make pickle files
+ echo. json to make JSON files
+ echo. htmlhelp to make HTML files and a HTML help project
+ echo. qthelp to make HTML files and a qthelp project
+ echo. devhelp to make HTML files and a Devhelp project
+ echo. epub to make an epub
+ echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
+ echo. text to make text files
+ echo. man to make manual pages
+ echo. texinfo to make Texinfo files
+ echo. gettext to make PO message catalogs
+ echo. changes to make an overview over all changed/added/deprecated items
+ echo. xml to make Docutils-native XML files
+ echo. pseudoxml to make pseudoxml-XML files for display purposes
+ echo. linkcheck to check all external links for integrity
+ echo. doctest to run all doctests embedded in the documentation if enabled
+ goto end
+)
+
+if "%1" == "clean" (
+ for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
+ del /q /s %BUILDDIR%\*
+ goto end
+)
+
+
+%SPHINXBUILD% 2> nul
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.http://sphinx-doc.org/
+ exit /b 1
+)
+
+if "%1" == "html" (
+ %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/html.
+ goto end
+)
+
+if "%1" == "dirhtml" (
+ %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
+ goto end
+)
+
+if "%1" == "singlehtml" (
+ %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
+ goto end
+)
+
+if "%1" == "pickle" (
+ %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the pickle files.
+ goto end
+)
+
+if "%1" == "json" (
+ %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the JSON files.
+ goto end
+)
+
+if "%1" == "htmlhelp" (
+ %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run HTML Help Workshop with the ^
+.hhp project file in %BUILDDIR%/htmlhelp.
+ goto end
+)
+
+if "%1" == "qthelp" (
+ %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run "qcollectiongenerator" with the ^
+.qhcp project file in %BUILDDIR%/qthelp, like this:
+ echo.^> qcollectiongenerator %BUILDDIR%\qthelp\kafka-python.qhcp
+ echo.To view the help file:
+ echo.^> assistant -collectionFile %BUILDDIR%\qthelp\kafka-python.ghc
+ goto end
+)
+
+if "%1" == "devhelp" (
+ %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished.
+ goto end
+)
+
+if "%1" == "epub" (
+ %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The epub file is in %BUILDDIR%/epub.
+ goto end
+)
+
+if "%1" == "latex" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdf" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdfja" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf-ja
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "text" (
+ %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The text files are in %BUILDDIR%/text.
+ goto end
+)
+
+if "%1" == "man" (
+ %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The manual pages are in %BUILDDIR%/man.
+ goto end
+)
+
+if "%1" == "texinfo" (
+ %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo.
+ goto end
+)
+
+if "%1" == "gettext" (
+ %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The message catalogs are in %BUILDDIR%/locale.
+ goto end
+)
+
+if "%1" == "changes" (
+ %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.The overview file is in %BUILDDIR%/changes.
+ goto end
+)
+
+if "%1" == "linkcheck" (
+ %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Link check complete; look for any errors in the above output ^
+or in %BUILDDIR%/linkcheck/output.txt.
+ goto end
+)
+
+if "%1" == "doctest" (
+ %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Testing of doctests in the sources finished, look at the ^
+results in %BUILDDIR%/doctest/output.txt.
+ goto end
+)
+
+if "%1" == "xml" (
+ %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The XML files are in %BUILDDIR%/xml.
+ goto end
+)
+
+if "%1" == "pseudoxml" (
+ %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml.
+ goto end
+)
+
+:end
diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..86b4f05 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,7 @@ +sphinx +sphinxcontrib-napoleon + +# Install kafka-python in editable mode +# This allows the sphinx autodoc module +# to load the Python modules and extract docstrings. +# -e .. diff --git a/docs/tests.rst b/docs/tests.rst new file mode 100644 index 0000000..df9a3ef --- /dev/null +++ b/docs/tests.rst @@ -0,0 +1,59 @@ +Tests +===== + +Run the unit tests +------------------ + +.. code:: bash + + tox + + +Run a subset of unit tests +-------------------------- + +.. code:: bash + + # run protocol tests only + tox -- -v test.test_protocol + + # test with pypy only + tox -e pypy + + # Run only 1 test, and use python 2.7 + tox -e py27 -- -v --with-id --collect-only + + # pick a test number from the list like #102 + tox -e py27 -- -v --with-id 102 + + +Run the integration tests +------------------------- + +The integration tests will actually start up real local Zookeeper +instance and Kafka brokers, and send messages in using the client. + +First, get the kafka binaries for integration testing: + +.. code:: bash + + ./build_integration.sh + +By default, the build_integration.sh script will download binary +distributions for all supported kafka versions. +To test against the latest source build, set KAFKA_VERSION=trunk +and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) + +.. code:: bash + + SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh + +Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` +env variable to the server build you want to use for testing: + +.. code:: bash + + KAFKA_VERSION=0.8.0 tox + KAFKA_VERSION=0.8.1 tox + KAFKA_VERSION=0.8.1.1 tox + KAFKA_VERSION=trunk tox diff --git a/docs/usage.rst b/docs/usage.rst new file mode 100644 index 0000000..141cf93 --- /dev/null +++ b/docs/usage.rst @@ -0,0 +1,124 @@ +Usage +===== + +High level +---------- + +.. code:: python + + from kafka import SimpleProducer, KafkaClient, KafkaConsumer + + # To send messages synchronously + kafka = KafkaClient("localhost:9092") + producer = SimpleProducer(kafka) + + # Note that the application is responsible for encoding messages to type str + producer.send_messages("my-topic", "some message") + producer.send_messages("my-topic", "this method", "is variadic") + + # Send unicode message + producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + + # To send messages asynchronously + # WARNING: current implementation does not guarantee message delivery on failure! + # messages can get dropped! Use at your own risk! Or help us improve with a PR! + producer = SimpleProducer(kafka, async=True) + producer.send_messages("my-topic", "async message") + + # To wait for acknowledgements + # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to + # a local log before sending response + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(kafka, async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000) + + response = producer.send_messages("my-topic", "another message") + + if response: + print(response[0].error) + print(response[0].offset) + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(kafka, batch_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + + # To consume messages + consumer = KafkaConsumer("my-topic", group_id="my_group", + metadata_broker_list=["localhost:9092"]) + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + kafka.close() + + +Keyed messages +-------------- + +.. code:: python + + from kafka import (KafkaClient, KeyedProducer, HashedPartitioner, + RoundRobinPartitioner) + + kafka = KafkaClient("localhost:9092") + + # HashedPartitioner is default + producer = KeyedProducer(kafka) + producer.send("my-topic", "key1", "some message") + producer.send("my-topic", "key2", "this methode") + + producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + + +Multiprocess consumer +--------------------- + +.. code:: python + + from kafka import KafkaClient, MultiProcessConsumer + + kafka = KafkaClient("localhost:9092") + + # This will split the number of partitions among two processes + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) + + # This will spawn processes such that each handles 2 partitions max + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", + partitions_per_proc=2) + + for message in consumer: + print(message) + + for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + +Low level +--------- + +.. code:: python + + from kafka import KafkaClient, create_message + from kafka.protocol import KafkaProtocol + from kafka.common import ProduceRequest + + kafka = KafkaClient("localhost:9092") + + req = ProduceRequest(topic="my-topic", partition=1, + messages=[create_message("some message")]) + resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) + kafka.close() + + resps[0].topic # "my-topic" + resps[0].partition # 1 + resps[0].error # 0 (hopefully) + resps[0].offset # offset of the first message sent in this request diff --git a/kafka/__init__.py b/kafka/__init__.py index 8ccdb4c..3536084 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -4,7 +4,7 @@ import pkg_resources __version__ = pkg_resources.require('kafka-python')[0].version __author__ = 'David Arthur' __license__ = 'Apache License 2.0' -__copyright__ = 'Copyright 2014, David Arthur under Apache License, v2.0' +__copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0' from kafka.client import KafkaClient from kafka.conn import KafkaConnection diff --git a/kafka/client.py b/kafka/client.py index 7a0cf18..48a534e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -15,6 +15,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol +from kafka.util import kafka_bytestring log = logging.getLogger("kafka") @@ -30,7 +31,7 @@ class KafkaClient(object): def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap - self.client_id = client_id + self.client_id = kafka_bytestring(client_id) self.timeout = timeout self.hosts = collect_hosts(hosts) @@ -85,7 +86,7 @@ class KafkaClient(object): self.load_metadata_for_topics(topic) # If the partition doesn't actually exist, raise - if partition not in self.topic_partitions[topic]: + if partition not in self.topic_partitions.get(topic, []): raise UnknownTopicOrPartitionError(key) # If there's no leader for the partition, raise @@ -131,19 +132,21 @@ class KafkaClient(object): the leader broker for that partition using the supplied encode/decode functions - Params - ====== + Arguments: + payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute + encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes + The response objects must be object-like and have topic + and partition attributes + + Returns: - Return - ====== List of response objects in the same order as the supplied payloads """ @@ -175,8 +178,13 @@ class KafkaClient(object): # Send the request, recv the response try: conn.send(requestId, request) + + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 if decoder_fn is None: continue + try: response = conn.recv(requestId) except ConnectionError as e: @@ -257,9 +265,9 @@ class KafkaClient(object): def get_partition_ids_for_topic(self, topic): if topic not in self.topic_partitions: - return None + return [] - return list(self.topic_partitions[topic]) + return sorted(list(self.topic_partitions[topic])) def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() @@ -285,9 +293,9 @@ class KafkaClient(object): This method should be called after receiving any error - @param: *topics (optional) - If a list of topics is provided, the metadata refresh will be limited - to the specified topics only. + Arguments: + *topics (optional): If a list of topics is provided, + the metadata refresh will be limited to the specified topics only. Exceptions: ---------- @@ -384,18 +392,16 @@ class KafkaClient(object): sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified - Params - ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we - encounter an API error? - callback: function, instead of returning the ProduceResponse, - first pass it through this function - - Return - ====== - list of ProduceResponse or callback(ProduceResponse), in the - order of input payloads + Arguments: + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function + + Returns: + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ encoder = functools.partial( diff --git a/kafka/conn.py b/kafka/conn.py index ddfee8b..30debec 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -47,10 +47,11 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. - host: the host name or IP address of a kafka broker - port: the port number the kafka broker is listening on - timeout: default 120. The socket timeout for sending and receiving data - in seconds. None means no timeout, so a request can block forever. + Arguments: + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() @@ -116,8 +117,10 @@ class KafkaConnection(local): def send(self, request_id, payload): """ Send a request to Kafka - param: request_id -- can be any int (used only for debug logging...) - param: payload -- an encoded kafka packet (see KafkaProtocol) + + Arguments:: + request_id (int): can be any int (used only for debug logging...) + payload: an encoded kafka packet (see KafkaProtocol) """ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) @@ -135,8 +138,12 @@ class KafkaConnection(local): def recv(self, request_id): """ Get a response packet from Kafka - param: request_id -- can be any int (only used for debug logging...) - returns encoded kafka packet response from server as type str + + Arguments: + request_id: can be any int (only used for debug logging...) + + Returns: + str: Encoded kafka packet response from server """ log.debug("Reading response %d from Kafka" % request_id) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 2464aaf..9cdcf89 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -32,9 +32,11 @@ class Consumer(object): Base class to be used by other consumers. Not to be used directly This base class provides logic for + * initialization and fetching metadata of partitions * Auto-commit logic * APIs for fetching pending message count + """ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, @@ -93,8 +95,9 @@ class Consumer(object): """ Commit offsets for this consumer - partitions: list of partitions to commit, default is to commit - all of them + Keyword Arguments: + partitions (list): list of partitions to commit, default is to commit + all of them """ # short circuit if nothing happened. This check is kept outside @@ -148,7 +151,8 @@ class Consumer(object): """ Gets the pending message count - partitions: list of partitions to check for, default is to check all + Keyword Arguments: + partitions (list): list of partitions to check for, default is to check all """ if not partitions: partitions = self.offsets.keys() diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index f16b526..49ffa7b 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -47,79 +47,86 @@ DEFAULT_CONSUMER_CONFIG = { 'rebalance_backoff_ms': 2000, } -BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id') - class KafkaConsumer(object): """ A simpler kafka consumer - ``` - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1') - for m in kafka: - print m - - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" - ``` - - ``` - # more advanced consumer -- multiple topics w/ auto commit offset management - kafka = KafkaConsumer('topic1', 'topic2', - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) - - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): + .. code:: python + + # A very basic 'tail' consumer, with no stored offset management + kafka = KafkaConsumer('topic1', + metadata_broker_list=['localhost:9092']) + for m in kafka: + print m + + # Alternate interface: next() + print kafka.next() + + # Alternate interface: batch iteration + while True: + for m in kafka.fetch_messages(): + print m + print "Done with batch - let's do another!" + + + .. code:: python + + # more advanced consumer -- multiple topics w/ auto commit offset + # management + kafka = KafkaConsumer('topic1', 'topic2', + metadata_broker_list=['localhost:9092'], + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in kafka: + process_message(m) + kafka.task_done(m) + + # Alternate interface: next() + m = kafka.next() process_message(m) kafka.task_done(m) - ``` + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + messages (m) are namedtuples with attributes: - m.topic: topic name (str) - m.partition: partition number (int) - m.offset: message offset on topic-partition log (int) - m.key: key (bytes - can be None) - m.value: message (output of deserializer_class - default is raw bytes) + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) Configuration settings can be passed to constructor, otherwise defaults will be used: - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - metadata_broker_list=None, - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 + + .. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + metadata_broker_list=None, + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi @@ -133,6 +140,9 @@ class KafkaConsumer(object): """ Configuration settings can be passed to constructor, otherwise defaults will be used: + + .. code:: python + client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, @@ -159,13 +169,6 @@ class KafkaConsumer(object): raise KafkaConfigurationError('Unknown configuration key(s): ' + str(list(configs.keys()))) - # Handle str/bytes conversions - for config_key in BYTES_CONFIGURATION_KEYS: - if isinstance(self._config[config_key], six.string_types): - logger.warning("Converting configuration key '%s' to bytes" % - config_key) - self._config[config_key] = self._config[config_key].encode('utf-8') - if self._config['auto_commit_enable']: if not self._config['group_id']: raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') @@ -189,28 +192,35 @@ class KafkaConsumer(object): Optionally specify offsets to start from Accepts types: - str (utf-8): topic name (will consume all available partitions) - tuple: (topic, partition) - dict: { topic: partition } - { topic: [partition list] } - { topic: (partition tuple,) } + + * str (utf-8): topic name (will consume all available partitions) + * tuple: (topic, partition) + * dict: + - { topic: partition } + - { topic: [partition list] } + - { topic: (partition tuple,) } Optionally, offsets can be specified directly: - tuple: (topic, partition, offset) - dict: { (topic, partition): offset, ... } - Ex: - kafka = KafkaConsumer() + * tuple: (topic, partition, offset) + * dict: { (topic, partition): offset, ... } + + Example: + + .. code:: python + + kafka = KafkaConsumer() - # Consume topic1-all; topic2-partition2; topic3-partition0 - kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 - # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) - # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) """ self._topics = [] self._client.load_metadata_for_topics() @@ -309,10 +319,12 @@ class KafkaConsumer(object): Otherwise blocks indefinitely Note that this is also the method called internally during iteration: - ``` - for m in consumer: - pass - ``` + + .. code:: python + + for m in consumer: + pass + """ self._set_consumer_timeout_start() while True: @@ -336,11 +348,12 @@ class KafkaConsumer(object): OffsetOutOfRange, per the configured `auto_offset_reset` policy Key configuration parameters: - `fetch_message_max_bytes` - `fetch_max_wait_ms` - `fetch_min_bytes` - `deserializer_class` - `auto_offset_reset` + + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` """ max_bytes = self._config['fetch_message_max_bytes'] @@ -408,6 +421,10 @@ class KafkaConsumer(object): offset, message.key, self._config['deserializer_class'](message.value)) + if offset < self._offsets.fetch[topic_partition]: + logger.debug('Skipping message %s because its offset is less than the consumer offset', + msg) + continue # Only increment fetch offset if we safely got the message and deserialized self._offsets.fetch[topic_partition] = offset + 1 @@ -418,20 +435,18 @@ class KafkaConsumer(object): """ Request available fetch offsets for a single topic/partition - @param topic (str) - @param partition (int) - @param request_time_ms (int) -- Used to ask for all messages before a - certain time (ms). There are two special - values. Specify -1 to receive the latest - offset (i.e. the offset of the next coming - message) and -2 to receive the earliest - available offset. Note that because offsets - are pulled in descending order, asking for - the earliest offset will always return you - a single element. - @param max_num_offsets (int) - - @return offsets (list) + Arguments: + topic (str) + partition (int) + request_time_ms (int): Used to ask for all messages before a + certain time (ms). There are two special values. Specify -1 to receive the latest + offset (i.e. the offset of the next coming message) and -2 to receive the earliest + available offset. Note that because offsets are pulled in descending order, asking for + the earliest offset will always return you a single element. + max_num_offsets (int) + + Returns: + offsets (list) """ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] @@ -448,9 +463,12 @@ class KafkaConsumer(object): def offsets(self, group=None): """ - Returns a copy of internal offsets struct - optional param: group [fetch|commit|task_done|highwater] - if no group specified, returns all groups + Keyword Arguments: + group: Either "fetch", "commit", "task_done", or "highwater". + If no group specified, returns all groups. + + Returns: + A copy of internal offsets struct """ if not group: return { @@ -498,8 +516,8 @@ class KafkaConsumer(object): Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group. - Note -- this functionality requires server version >=0.8.1.1 - see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + **Note**: this functionality requires server version >=0.8.1.1 + See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') @@ -531,7 +549,7 @@ class KafkaConsumer(object): if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(self._config['group_id'], + resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']), commits, fail_on_error=False) @@ -595,7 +613,7 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], + kafka_bytestring(self._config['group_id']), [OffsetFetchRequest(topic_partition[0], topic_partition[1])], fail_on_error=False) try: diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 912e64b..4dc04dc 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer): A consumer implementation that consumes partitions for a topic in parallel using multiple processes - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4..3d250ea 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -8,6 +8,7 @@ import logging import time import six +import sys try: from Queue import Empty, Queue @@ -16,7 +17,9 @@ except ImportError: # python 2 from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -67,24 +70,36 @@ class SimpleConsumer(Consumer): A simple consumer implementation that consumes all/specified partitions for a topic - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + + auto_commit_every_n: default 100. How many messages to consume + before a commit + + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. + + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -98,7 +113,8 @@ class SimpleConsumer(Consumer): fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + auto_offset_reset='largest'): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -117,12 +133,38 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.auto_offset_reset = auto_offset_reset self.queue = Queue() def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.auto_offset_reset == 'largest': + reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + elif self.auto_offset_reset == 'smallest': + reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + else: + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise + + # send_offset_request + (resp, ) = self.client.send_offset_request(reqs) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -133,11 +175,13 @@ class SimpleConsumer(Consumer): """ Alter the current offset in the consumer, similar to fseek - offset: how much to modify the offset - whence: where to modify it from - 0 is relative to the earliest available offset (head) - 1 is relative to the current offset - 2 is relative to the latest known offset (tail) + Arguments: + offset: how much to modify the offset + whence: where to modify it from + + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) """ if whence == 1: # relative to current position @@ -180,11 +224,12 @@ class SimpleConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout is not None: @@ -286,14 +331,35 @@ class SimpleConsumer(Consumer): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + # Retry this partition + retry_partitions[resp.partition] = partitions[resp.partition] + continue + partition = resp.partition buffer_size = partitions[partition] try: for message in resp.messages: + if message.offset < self.fetch_offsets[partition]: + log.debug('Skipping message %s because its offset is less than the consumer offset', + message) + continue # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 diff --git a/kafka/context.py b/kafka/context.py index 98ed7b3..ade4db8 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -18,6 +18,8 @@ class OffsetCommitContext(object): Example: + .. code:: python + consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() consumer.fetch_last_known_offsets() @@ -57,7 +59,10 @@ class OffsetCommitContext(object): In order to know the current partition, it is helpful to initialize the consumer to provide partition info via: + .. code:: python + consumer.provide_partition_info() + """ max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py index c62b7ed..857f634 100644 --- a/kafka/partitioner/base.py +++ b/kafka/partitioner/base.py @@ -7,17 +7,18 @@ class Partitioner(object): """ Initialize the partitioner - partitions - A list of available partitions (during startup) + Arguments: + partitions: A list of available partitions (during startup) """ self.partitions = partitions - def partition(self, key, partitions): + def partition(self, key, partitions=None): """ Takes a string key and num_partitions as argument and returns a partition to be used for the message - partitions - The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing + Arguments: + key: the key to use for partitioning + partitions: (optional) a list of partitions. """ raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index 587a3de..fb5e598 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -5,7 +5,9 @@ class HashedPartitioner(Partitioner): Implements a partitioner which selects the target partition based on the hash of the key """ - def partition(self, key, partitions): + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions size = len(partitions) idx = hash(key) % size diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py index 54d00da..6439e53 100644 --- a/kafka/partitioner/roundrobin.py +++ b/kafka/partitioner/roundrobin.py @@ -15,9 +15,9 @@ class RoundRobinPartitioner(Partitioner): self.partitions = partitions self.iterpart = cycle(partitions) - def partition(self, key, partitions): + def partition(self, key, partitions=None): # Refresh the partition list if necessary - if self.partitions != partitions: + if partitions and self.partitions != partitions: self._set_partitions(partitions) return next(self.iterpart) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92..695f195 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -85,20 +85,20 @@ class Producer(object): """ Base class to be used by producers - Params: - client - The Kafka client instance to use - async - If set to true, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + async: If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these WARNING!!! current implementation of async producer does not guarantee message delivery. Use at your own risk! Or help us improve with a PR! - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ ACK_NOT_REQUIRED = 0 # No ack is required @@ -127,6 +127,7 @@ class Producer(object): self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout + self.stopped = False if codec is None: codec = CODEC_NONE @@ -212,3 +213,8 @@ class Producer(object): if self.proc.is_alive(): self.proc.terminate() + self.stopped = True + + def __del__(self): + if not self.stopped: + self.stop() diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 68c70d9..36328ed 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -15,17 +15,19 @@ class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key - Args: - client - The kafka client instance - partitioner - A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async - If True, the messages are sent asynchronously via another + Arguments: + client: The kafka client instance + + Keyword Arguments: + partitioner: A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, @@ -52,7 +54,7 @@ class KeyedProducer(Producer): self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) + return partitioner.partition(key) def send_messages(self,topic,key,*msg): partition = self._next_partition(topic, key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 401b79b..2699cf2 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import random +import six from itertools import cycle @@ -19,21 +20,23 @@ class SimpleProducer(Producer): """ A simple, round-robin producer. Each message goes to exactly one partition - Params: - client - The Kafka client instance to use - async - If True, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + + Keyword Arguments: + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout - random_start - If true, randomize the initial partition which the - the first message block will be published to, otherwise - if false, the first message block will always publish - to partition 0 before cycling through each partition + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout + random_start: If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition """ def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, @@ -66,8 +69,13 @@ class SimpleProducer(Producer): return next(self.partition_cycles[topic]) def send_messages(self, topic, *msg): + if not isinstance(topic, six.binary_type): + topic = topic.encode('utf-8') + partition = self._next_partition(topic) - return super(SimpleProducer, self).send_messages(topic, partition, *msg) + return super(SimpleProducer, self).send_messages( + topic, partition, *msg + ) def __repr__(self): return '<SimpleProducer batch=%s>' % self.async diff --git a/kafka/protocol.py b/kafka/protocol.py index a85c7eb..2a39de6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -185,18 +185,18 @@ class KafkaProtocol(object): """ Encode some ProduceRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of ProduceRequest - acks: How "acky" you want the request to be - 0: immediate response - 1: written to disk by the leader - 2+: waits for this many number of replicas to sync - -1: waits for all replicas to be in sync - timeout: Maximum time the server will wait for acks from replicas. - This is _not_ a socket timeout + Arguments: + client_id: string + correlation_id: int + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. + This is _not_ a socket timeout + """ payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) @@ -225,9 +225,9 @@ class KafkaProtocol(object): """ Decode bytes to a ProduceResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode + """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -248,14 +248,13 @@ class KafkaProtocol(object): """ Encodes some FetchRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of FetchRequest - max_wait_time: int, how long to block waiting on min_bytes of data - min_bytes: int, the minimum number of bytes to accumulate before - returning the response + Arguments: + client_id: string + correlation_id: int + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before + returning the response """ payloads = [] if payloads is None else payloads @@ -284,9 +283,8 @@ class KafkaProtocol(object): """ Decode bytes to a FetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -333,9 +331,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -360,11 +357,10 @@ class KafkaProtocol(object): """ Encode a MetadataRequest - Params - ====== - client_id: string - correlation_id: int - topics: list of strings + Arguments: + client_id: string + correlation_id: int + topics: list of strings """ if payloads is None: topics = [] if topics is None else topics @@ -388,9 +384,8 @@ class KafkaProtocol(object): """ Decode bytes to a MetadataResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) @@ -439,12 +434,11 @@ class KafkaProtocol(object): """ Encode some OffsetCommitRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -470,9 +464,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetCommitResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -491,12 +484,11 @@ class KafkaProtocol(object): """ Encode some OffsetFetchRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -522,9 +514,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetFetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) @@ -547,10 +538,10 @@ def create_message(payload, key=None): """ Construct a Message - Params - ====== - payload: bytes, the payload to send to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ return Message(0, 0, key, payload) @@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) @@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) diff --git a/kafka/queue.py b/kafka/queue.py deleted file mode 100644 index ada495f..0000000 --- a/kafka/queue.py +++ /dev/null @@ -1,219 +0,0 @@ -from __future__ import absolute_import - -from copy import copy -import logging -from multiprocessing import Process, Queue, Event -from Queue import Empty -import time - -from kafka.client import KafkaClient, FetchRequest, ProduceRequest - -log = logging.getLogger("kafka") - -raise NotImplementedError("Still need to refactor this class") - - -class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, - consumer_fetch_size=1024, consumer_sleep=200): - self.client = copy(client) - self.topic = topic - self.partition = partition - self.out_queue = out_queue - self.barrier = barrier - self.consumer_fetch_size = consumer_fetch_size - self.consumer_sleep = consumer_sleep / 1000. - log.info("Initializing %s" % self) - Process.__init__(self) - - def __str__(self): - return "[KafkaConsumerProcess: topic=%s, \ - partition=%s, sleep=%s]" % \ - (self.topic, self.partition, self.consumer_sleep) - - def run(self): - self.barrier.wait() - log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, - offset=0, size=self.consumer_fetch_size) - - while True: - if self.barrier.is_set() is False: - log.info("Shutdown %s" % self) - self.client.close() - break - - lastOffset = fetchRequest.offset - (messages, fetchRequest) = self.client.get_message_set(fetchRequest) - - if fetchRequest.offset == lastOffset: - log.debug("No more data for this partition, " - "sleeping a bit (200ms)") - time.sleep(self.consumer_sleep) - continue - - for message in messages: - self.out_queue.put(message) - - -class KafkaProducerProcess(Process): - def __init__(self, client, topic, in_queue, barrier, - producer_flush_buffer=500, - producer_flush_timeout=2000, - producer_timeout=100): - - self.client = copy(client) - self.topic = topic - self.in_queue = in_queue - self.barrier = barrier - self.producer_flush_buffer = producer_flush_buffer - self.producer_flush_timeout = producer_flush_timeout / 1000. - self.producer_timeout = producer_timeout / 1000. - log.info("Initializing %s" % self) - Process.__init__(self) - - def __str__(self): - return "[KafkaProducerProcess: topic=%s, \ - flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \ - (self.topic, - self.producer_flush_buffer, - self.producer_flush_timeout, - self.producer_timeout) - - def run(self): - self.barrier.wait() - log.info("Starting %s" % self) - messages = [] - last_produce = time.time() - - def flush(messages): - self.client.send_message_set(ProduceRequest(self.topic, -1, - messages)) - del messages[:] - - while True: - if self.barrier.is_set() is False: - log.info("Shutdown %s, flushing messages" % self) - flush(messages) - self.client.close() - break - - if len(messages) > self.producer_flush_buffer: - log.debug("Message count threshold reached. Flushing messages") - flush(messages) - last_produce = time.time() - - elif (time.time() - last_produce) > self.producer_flush_timeout: - log.debug("Producer timeout reached. Flushing messages") - flush(messages) - last_produce = time.time() - - try: - msg = KafkaClient.create_message( - self.in_queue.get(True, self.producer_timeout)) - messages.append(msg) - - except Empty: - continue - - -class KafkaQueue(object): - def __init__(self, client, topic, partitions, - producer_config=None, consumer_config=None): - """ - KafkaQueue a Queue-like object backed by a Kafka producer and some - number of consumers - - Messages are eagerly loaded by the consumer in batches of size - consumer_fetch_size. - Messages are buffered in the producer thread until - producer_flush_timeout or producer_flush_buffer is reached. - - Params - ====== - client: KafkaClient object - topic: str, the topic name - partitions: list of ints, the partions to consume from - producer_config: dict, see below - consumer_config: dict, see below - - Consumer Config - =============== - consumer_fetch_size: int, number of bytes to fetch in one call - to Kafka. Default is 1024 - consumer_sleep: int, time in milliseconds a consumer should sleep - when it reaches the end of a partition. Default is 200 - - Producer Config - =============== - producer_timeout: int, time in milliseconds a producer should - wait for messages to enqueue for producing. - Default is 100 - producer_flush_timeout: int, time in milliseconds a producer - should allow messages to accumulate before - sending to Kafka. Default is 2000 - producer_flush_buffer: int, number of messages a producer should - allow to accumulate. Default is 500 - - """ - producer_config = {} if producer_config is None else producer_config - consumer_config = {} if consumer_config is None else consumer_config - - self.in_queue = Queue() - self.out_queue = Queue() - self.consumers = [] - self.barrier = Event() - - # Initialize and start consumer threads - for partition in partitions: - consumer = KafkaConsumerProcess(client, topic, partition, - self.in_queue, self.barrier, - **consumer_config) - consumer.start() - self.consumers.append(consumer) - - # Initialize and start producer thread - self.producer = KafkaProducerProcess(client, topic, self.out_queue, - self.barrier, **producer_config) - self.producer.start() - - # Trigger everything to start - self.barrier.set() - - def get(self, block=True, timeout=None): - """ - Consume a message from Kafka - - Params - ====== - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None - - Returns - ======= - msg: str, the payload from Kafka - """ - return self.in_queue.get(block, timeout).payload - - def put(self, msg, block=True, timeout=None): - """ - Send a message to Kafka - - Params - ====== - msg: std, the message to send - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None - """ - self.out_queue.put(msg, block, timeout) - - def close(self): - """ - Close the internal queues and Kafka consumers/producer - """ - self.in_queue.close() - self.out_queue.close() - self.barrier.clear() - self.producer.join() - for consumer in self.consumers: - consumer.join() diff --git a/kafka/util.py b/kafka/util.py index 72ac521..78c3607 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -103,10 +103,12 @@ class ReentrantTimer(object): A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function + Arguments: + + t: timer interval in milliseconds + fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ def __init__(self, t, fn, *args, **kwargs): @@ -124,7 +126,11 @@ class ReentrantTimer(object): self.active = None def _timer(self, active): - while not active.wait(self.t): + # python2.6 Event.wait() always returns None + # python2.7 and greater returns the flag value (true/false) + # we want the flag value, so add an 'or' here for python2.6 + # this is redundant for later python versions (FLAG OR FLAG == FLAG) + while not (active.wait(self.t) or active.is_set()): self.fn(*self.args, **self.kwargs) def start(self): @@ -144,3 +150,7 @@ class ReentrantTimer(object): self.thread.join(self.t + 1) # noinspection PyAttributeOutsideInit self.timer = None + self.fn = None + + def __del__(self): + self.stop() @@ -1,5 +1,5 @@ import sys - +import os from setuptools import setup, Command with open('VERSION', 'r') as v: @@ -26,6 +26,10 @@ test_require = ['tox', 'mock'] if sys.version_info < (2, 7): test_require.append('unittest2') +here = os.path.abspath(os.path.dirname(__file__)) + +with open(os.path.join(here, 'README.rst')) as f: + README = f.read() setup( name="kafka-python", @@ -46,15 +50,10 @@ setup( url="https://github.com/mumrah/kafka-python", license="Apache License 2.0", description="Pure Python client for Apache Kafka", - long_description=""" -This module provides low-level protocol support for Apache Kafka as well as -high-level consumer and producer classes. Request batching is supported by the -protocol as well as broker-aware request routing. Gzip and Snappy compression -is also supported for message sets. -""", + long_description=README, keywords="apache kafka", install_requires=['six'], - classifiers = [ + classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", @@ -62,6 +61,9 @@ is also supported for message sets. "Programming Language :: Python :: 2", "Programming Language :: Python :: 2.6", "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", ] diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4723220..9c89190 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -5,7 +5,7 @@ from six.moves import xrange from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout + ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions('all') + def test_simple_consumer_smallest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + consumer = self.consumer(auto_offset_reset='smallest') + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to smallest we should read all 200 + # messages from beginning. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_largest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer() + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to largest we should not read any + # messages. + self.assert_message_count([message for message in consumer], 0) + # Send 200 new messages to the queue + self.send_messages(0, range(200, 300)) + self.send_messages(1, range(300, 400)) + # Since the offset is set to largest we should read all the new messages. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_no_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer(auto_offset_reset=None) + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + with self.assertRaises(OffsetOutOfRangeError): + consumer.get_message() + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index ca71f2d..7d27526 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -7,6 +7,7 @@ from . import unittest from kafka import KafkaClient, SimpleConsumer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer.base import Producer +from kafka.producer import KeyedProducer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -17,8 +18,7 @@ from test.testutil import ( class TestFailover(KafkaIntegrationTestCase): create_client = False - @classmethod - def setUpClass(cls): # noqa + def setUp(self): if not os.environ.get('KAFKA_VERSION'): return @@ -27,33 +27,41 @@ class TestFailover(KafkaIntegrationTestCase): partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] - cls.client = KafkaClient(hosts) + hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] + self.client = KafkaClient(hosts) + super(TestFailover, self).setUp() - @classmethod - def tearDownClass(cls): + def tearDown(self): + super(TestFailover, self).tearDown() if not os.environ.get('KAFKA_VERSION'): return - cls.client.close() - for broker in cls.brokers: + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() @kafka_versions("all") def test_switch_leader(self): topic = self.topic partition = 0 - # Test the base class Producer -- send_messages to a specific partition + # Testing the base Producer class here so that we can easily send + # messages to a specific partition, kill the leader for that partition + # and check that after another broker takes leadership the producer + # is able to resume sending messages + + # require that the server commit messages to all in-sync replicas + # so that failover doesn't lose any messages on server-side + # and we can assert that server-side message count equals client-side producer = Producer(self.client, async=False, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) - # Send 10 random messages + # Send 100 random messages to a specific partition self._send_random_messages(producer, topic, partition, 100) # kill leader for partition @@ -80,7 +88,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 100) # count number of messages - # Should be equal to 10 before + 1 recovery + 10 after + # Should be equal to 100 before + 1 recovery + 100 after self.assert_message_count(topic, 201, partitions=(partition,)) @@ -116,6 +124,45 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,)) + @kafka_versions("all") + def test_switch_leader_keyed_producer(self): + topic = self.topic + + producer = KeyedProducer(self.client, async=False) + + # Send 10 random messages + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + # kill leader for partition 0 + self._kill_leader(topic, 0) + + recovered = False + started = time.time() + timeout = 60 + while not recovered and (time.time() - started) < timeout: + try: + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + if producer.partitioners[topic].partition(key) == 0: + recovered = True + except (FailedPayloadsError, ConnectionError): + logging.debug("caught exception sending message -- will retry") + continue + + # Verify we successfully sent the message + self.assertTrue(recovered) + + # send some more messages just to make sure no more exceptions + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + def _send_random_messages(self, producer, topic, partition, n): for j in range(n): logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) diff --git a/test/test_producer.py b/test/test_producer.py index caf8fe3..f6b3d6a 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,6 +7,7 @@ from . import unittest from kafka.producer.base import Producer + class TestKafkaProducer(unittest.TestCase): def test_producer_message_types(self): @@ -25,3 +26,17 @@ class TestKafkaProducer(unittest.TestCase): # This should not raise an exception producer.send_messages(topic, partition, m) + def test_topic_message_types(self): + from kafka.producer.simple import SimpleProducer + + client = MagicMock() + + def partitions(topic): + return [0, 1] + + client.get_partition_ids_for_topic = partitions + + producer = SimpleProducer(client, random_start=False) + topic = b"test-topic" + producer.send_messages(topic, b'hi') + assert client.send_produce_request.called diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 19d28bd..38df69f 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -14,12 +14,12 @@ from kafka.common import ( FetchRequest, ProduceRequest, UnknownTopicOrPartitionError, LeaderNotAvailableError ) +from kafka.producer.base import Producer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions class TestKafkaProducerIntegration(KafkaIntegrationTestCase): - topic = b'produce_topic' @classmethod def setUpClass(cls): # noqa @@ -140,25 +140,26 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_simple_producer(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + producer = SimpleProducer(self.client, random_start=False) # Goes to first partition, randomly. resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - self.assert_produce_response(resp, start_offset0) + self.assert_produce_response(resp, start_offsets[0]) # Goes to the next partition, randomly. resp = producer.send_messages(self.topic, self.msg("three")) - self.assert_produce_response(resp, start_offset1) + self.assert_produce_response(resp, start_offsets[1]) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ]) - self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ]) + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ]) # Goes back to the first partition because there's only two partitions resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - self.assert_produce_response(resp, start_offset0+2) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) + self.assert_produce_response(resp, start_offsets[0]+2) + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) producer.stop() @@ -194,110 +195,38 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(resp3[0].partition, 0) @kafka_versions("all") - def test_round_robin_partitioner(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) - - producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) - - self.assert_produce_response(resp1, start_offset0+0) - self.assert_produce_response(resp2, start_offset1+0) - self.assert_produce_response(resp3, start_offset0+1) - self.assert_produce_response(resp4, start_offset1+1) - - self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ]) - self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ]) - - producer.stop() - - @kafka_versions("all") - def test_hashed_partitioner(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) - - producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) - resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) - - offsets = {0: start_offset0, 1: start_offset1} - messages = {0: [], 1: []} - - keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] - resps = [resp1, resp2, resp3, resp4, resp5] - msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] - - for key, resp, msg in zip(keys, resps, msgs): - k = hash(key) % 2 - offset = offsets[k] - self.assert_produce_response(resp, offset) - offsets[k] += 1 - messages[k].append(msg) - - self.assert_fetch_offset(0, start_offset0, messages[0]) - self.assert_fetch_offset(1, start_offset1, messages[1]) - - producer.stop() - - @kafka_versions("all") - def test_acks_none(self): - start_offset0 = self.current_offset(self.topic, 0) + def test_async_simple_producer(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) - producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED, - random_start=False) + producer = SimpleProducer(self.client, async=True, random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) - producer.stop() - - @kafka_versions("all") - def test_acks_local_write(self): - start_offset0 = self.current_offset(self.topic, 0) - - producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - random_start=False) - resp = producer.send_messages(self.topic, self.msg("one")) - - self.assert_produce_response(resp, start_offset0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + # wait for the server to report a new highwatermark + while self.current_offset(self.topic, partition) == start_offset: + time.sleep(0.1) - producer.stop() - - @kafka_versions("all") - def test_acks_cluster_commit(self): - start_offset0 = self.current_offset(self.topic, 0) - - producer = SimpleProducer( - self.client, - req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, - random_start=False) - - resp = producer.send_messages(self.topic, self.msg("one")) - self.assert_produce_response(resp, start_offset0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + # Configure batch producer + batch_messages = 5 + batch_interval = 5 producer = SimpleProducer( self.client, batch_send=True, - batch_send_every_n=5, - batch_send_every_t=20, + batch_send_every_n=batch_messages, + batch_send_every_t=batch_interval, random_start=False) - # Send 5 messages and do a fetch + # Send 4 messages -- should not trigger a batch resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"), @@ -309,9 +238,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(resp), 0) # It hasn't sent yet - self.assert_fetch_offset(0, start_offset0, []) - self.assert_fetch_offset(1, start_offset1, []) + self.assert_fetch_offset(partitions[0], start_offsets[0], []) + self.assert_fetch_offset(partitions[1], start_offsets[1], []) + # send 3 more messages -- should trigger batch on first 5 resp = producer.send_messages(self.topic, self.msg("five"), self.msg("six"), @@ -321,30 +251,32 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # Batch mode is async. No ack self.assertEqual(len(resp), 0) - self.assert_fetch_offset(0, start_offset0, [ + # send messages groups all *msgs in a single call to the same partition + # so we should see all messages from the first call in one partition + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("three"), self.msg("four"), ]) - self.assert_fetch_offset(1, start_offset1, [ + # Because we are batching every 5 messages, we should only see one + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("five"), - # self.msg("six"), - # self.msg("seven"), ]) producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): - start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + batch_interval = 5 producer = SimpleProducer(self.client, batch_send=True, batch_send_every_n=100, - batch_send_every_t=5, + batch_send_every_t=batch_interval, random_start=False) # Send 5 messages and do a fetch @@ -359,8 +291,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(resp), 0) # It hasn't sent yet - self.assert_fetch_offset(0, start_offset0, []) - self.assert_fetch_offset(1, start_offset1, []) + self.assert_fetch_offset(partitions[0], start_offsets[0], []) + self.assert_fetch_offset(partitions[1], start_offsets[1], []) resp = producer.send_messages(self.topic, self.msg("five"), @@ -372,16 +304,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(resp), 0) # Wait the timeout out - time.sleep(5) + time.sleep(batch_interval) - self.assert_fetch_offset(0, start_offset0, [ + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("three"), self.msg("four"), ]) - self.assert_fetch_offset(1, start_offset1, [ + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("five"), self.msg("six"), self.msg("seven"), @@ -389,40 +321,146 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + + ############################ + # KeyedProducer Tests # + ############################ + @kafka_versions("all") - def test_async_simple_producer(self): - start_offset0 = self.current_offset(self.topic, 0) + def test_round_robin_partitioner(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] - producer = SimpleProducer(self.client, async=True, random_start=False) - resp = producer.send_messages(self.topic, self.msg("one")) - self.assertEqual(len(resp), 0) + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) + resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) + resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) + resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) + + self.assert_produce_response(resp1, start_offsets[0]+0) + self.assert_produce_response(resp2, start_offsets[1]+0) + self.assert_produce_response(resp3, start_offsets[0]+1) + self.assert_produce_response(resp4, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ]) + + producer.stop() + + @kafka_versions("all") + def test_hashed_partitioner(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) + resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) + resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) + resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) + resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) + resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) + + offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} + messages = {partitions[0]: [], partitions[1]: []} + + keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] + resps = [resp1, resp2, resp3, resp4, resp5] + msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] + + for key, resp, msg in zip(keys, resps, msgs): + k = hash(key) % 2 + partition = partitions[k] + offset = offsets[partition] + self.assert_produce_response(resp, offset) + offsets[partition] += 1 + messages[partition].append(msg) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]]) + self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]]) producer.stop() @kafka_versions("all") def test_async_keyed_producer(self): - start_offset0 = self.current_offset(self.topic, 0) + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) resp = producer.send(self.topic, self.key("key1"), self.msg("one")) self.assertEqual(len(resp), 0) - self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + # wait for the server to report a new highwatermark + while self.current_offset(self.topic, partition) == start_offset: + time.sleep(0.1) + + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + + producer.stop() + + ############################ + # Producer ACK Tests # + ############################ + + @kafka_versions("all") + def test_acks_none(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + + producer = Producer( + self.client, + req_acks=Producer.ACK_NOT_REQUIRED, + ) + resp = producer.send_messages(self.topic, partition, self.msg("one")) + + # No response from produce request with no acks required + self.assertEqual(len(resp), 0) + + # But the message should still have been delivered + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + producer.stop() + + @kafka_versions("all") + def test_acks_local_write(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + + producer = Producer( + self.client, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, + ) + resp = producer.send_messages(self.topic, partition, self.msg("one")) + + self.assert_produce_response(resp, start_offset) + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) + + producer.stop() + + @kafka_versions("all") + def test_acks_cluster_commit(self): + partition = self.client.get_partition_ids_for_topic(self.topic)[0] + start_offset = self.current_offset(self.topic, partition) + + producer = Producer( + self.client, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, + ) + + resp = producer.send_messages(self.topic, partition, self.msg("one")) + self.assert_produce_response(resp, start_offset) + self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) producer.stop() - def assert_produce_request(self, messages, initial_offset, message_ct): - produce = ProduceRequest(self.topic, 0, messages=messages) + def assert_produce_request(self, messages, initial_offset, message_ct, + partition=0): + produce = ProduceRequest(self.topic, partition, messages=messages) # There should only be one response message from the server. # This will throw an exception if there's more than one. resp = self.client.send_produce_request([ produce ]) self.assert_produce_response(resp, initial_offset) - self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct) + self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct) def assert_produce_response(self, resp, initial_offset): self.assertEqual(len(resp), 1) @@ -1,5 +1,6 @@ [tox] -envlist = lint, py26, py27, pypy, py33, py34 +envlist = lint, py26, py27, pypy, py33, py34, docs + [testenv] deps = six @@ -36,4 +37,14 @@ deps = unittest2 mock pylint -commands = pylint {posargs: -E --ignore=queue.py kafka test} +commands = pylint {posargs: -E kafka test} + +[testenv:docs] +deps = + sphinxcontrib-napoleon + sphinx_rtd_theme + sphinx + +commands = + sphinx-apidoc -o docs/apidoc/ kafka/ + sphinx-build -b html docs/ docs/_build |