summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--.travis.yml6
-rw-r--r--CHANGES.md23
-rw-r--r--LICENSE2
-rw-r--r--README.md264
-rw-r--r--README.rst53
-rw-r--r--VERSION2
-rwxr-xr-xbuild_integration.sh16
-rw-r--r--docs/Makefile177
-rw-r--r--docs/apidoc/kafka.consumer.rst46
-rw-r--r--docs/apidoc/kafka.partitioner.rst38
-rw-r--r--docs/apidoc/kafka.producer.rst38
-rw-r--r--docs/apidoc/kafka.rst79
-rw-r--r--docs/apidoc/modules.rst7
-rw-r--r--docs/conf.py272
-rw-r--r--docs/index.rst58
-rw-r--r--docs/install.rst79
-rw-r--r--docs/make.bat242
-rw-r--r--docs/requirements.txt7
-rw-r--r--docs/tests.rst59
-rw-r--r--docs/usage.rst124
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/client.py62
-rw-r--r--kafka/conn.py23
-rw-r--r--kafka/consumer/base.py10
-rw-r--r--kafka/consumer/kafka.py246
-rw-r--r--kafka/consumer/multiprocess.py39
-rw-r--r--kafka/consumer/simple.py128
-rw-r--r--kafka/context.py5
-rw-r--r--kafka/partitioner/base.py11
-rw-r--r--kafka/partitioner/hashed.py4
-rw-r--r--kafka/partitioner/roundrobin.py4
-rw-r--r--kafka/producer/base.py26
-rw-r--r--kafka/producer/keyed.py24
-rw-r--r--kafka/producer/simple.py38
-rw-r--r--kafka/protocol.py125
-rw-r--r--kafka/queue.py219
-rw-r--r--kafka/util.py20
-rw-r--r--setup.py18
-rw-r--r--test/test_consumer_integration.py44
-rw-r--r--test/test_failover_integration.py77
-rw-r--r--test/test_producer.py15
-rw-r--r--test/test_producer_integration.py282
-rw-r--r--tox.ini15
44 files changed, 2071 insertions, 959 deletions
diff --git a/.gitignore b/.gitignore
index 8cf9c4e..30d663d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,4 @@ env
servers/*/kafka-bin
.coverage
.noseids
+docs/_build
diff --git a/.travis.yml b/.travis.yml
index f4fc66b..7184bc8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,13 +12,14 @@ env:
- KAFKA_VERSION=0.8.0
- KAFKA_VERSION=0.8.1
- KAFKA_VERSION=0.8.1.1
+ - KAFKA_VERSION=0.8.2.0
before_install:
- sudo apt-get install libsnappy-dev
- ./build_integration.sh
install:
- - pip install tox
+ - pip install tox coveralls
- pip install .
# Deal with issue on Travis builders re: multiprocessing.Queue :(
# See https://github.com/travis-ci/travis-cookbooks/issues/155
@@ -38,3 +39,6 @@ deploy:
script:
- if [ -n "$UNIT_AND_LINT_ONLY" ]; then tox -e lint,`./travis_selector.sh $TRAVIS_PYTHON_VERSION`; else tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`; fi
+
+after_success:
+ - coveralls
diff --git a/CHANGES.md b/CHANGES.md
index a88a690..5704afa 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,26 @@
-# 0.9.3 (Unreleased)
+# 0.9.3 (Feb 3, 2015)
+* Add coveralls.io support (sontek PR 307)
+* Fix python2.6 threading.Event bug in ReentrantTimer (dpkp PR 312)
+* Add kafka 0.8.2.0 to travis integration tests (dpkp PR 310)
+* Auto-convert topics to utf-8 bytes in Producer (sontek PR 306)
+* Fix reference cycle between SimpleConsumer and ReentrantTimer (zhaopengzp PR 309)
+* Add Sphinx API docs (wedaly PR 282)
+* Handle additional error cases exposed by 0.8.2.0 kafka server (dpkp PR 295)
+* Refactor error class management (alexcb PR 289)
+* Expose KafkaConsumer in __all__ for easy imports (Dinoshauer PR 286)
+* SimpleProducer starts on random partition by default (alexcb PR 288)
+* Add keys to compressed messages (meandthewallaby PR 281)
+* Add new high-level KafkaConsumer class based on java client api (dpkp PR 234)
+* Add KeyedProducer.send_messages api (pubnub PR 277)
+* Fix consumer pending() method (jettify PR 276)
+* Update low-level demo in README (sunisdown PR 274)
+* Include key in KeyedProducer messages (se7entyse7en PR 268)
+* Fix SimpleConsumer timeout behavior in get_messages (dpkp PR 238)
+* Fix error in consumer.py test against max_buffer_size (rthille/wizzat PR 225/242)
+* Improve string concat performance on pypy / py3 (dpkp PR 233)
+* Reorg directory layout for consumer/producer/partitioners (dpkp/wizzat PR 232/243)
+* Add OffsetCommitContext (locationlabs PR 217)
* Metadata Refactor (dpkp PR 223)
* Add Python 3 support (brutasse/wizzat - PR 227)
* Minor cleanups - imports / README / PyPI classifiers (dpkp - PR 221)
diff --git a/LICENSE b/LICENSE
index f1b6938..412a2b6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2014 David Arthur
+ Copyright 2015 David Arthur
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/README.md b/README.md
deleted file mode 100644
index 6655b92..0000000
--- a/README.md
+++ /dev/null
@@ -1,264 +0,0 @@
-# Kafka Python client
-
-[![Build Status](https://api.travis-ci.org/mumrah/kafka-python.png?branch=master)](https://travis-ci.org/mumrah/kafka-python)
-
-This module provides low-level protocol support for Apache Kafka as well as
-high-level consumer and producer classes. Request batching is supported by the
-protocol as well as broker-aware request routing. Gzip and Snappy compression
-is also supported for message sets.
-
-http://kafka.apache.org/
-
-On Freenode IRC at #kafka-python, as well as #apache-kafka
-
-For general discussion of kafka-client design and implementation (not python specific),
-see https://groups.google.com/forum/m/#!forum/kafka-clients
-
-# License
-
-Copyright 2014, David Arthur under Apache License, v2.0. See `LICENSE`
-
-# Status
-
-The current stable version of this package is [**0.9.2**](https://github.com/mumrah/kafka-python/releases/tag/v0.9.2) and is compatible with
-
-Kafka broker versions
-- 0.8.0
-- 0.8.1
-- 0.8.1.1
-
-Python versions
-- 2.6 (tested on 2.6.9)
-- 2.7 (tested on 2.7.8)
-- pypy (tested on pypy 2.3.1 / python 2.7.6)
-- (Python 3.3 and 3.4 support has been added to trunk and will be available the next release)
-
-# Usage
-
-## High level
-
-```python
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
-
-# To send messages synchronously
-kafka = KafkaClient("localhost:9092")
-producer = SimpleProducer(kafka)
-
-# Note that the application is responsible for encoding messages to type str
-producer.send_messages("my-topic", "some message")
-producer.send_messages("my-topic", "this method", "is variadic")
-
-# Send unicode message
-producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
-
-# To send messages asynchronously
-# WARNING: current implementation does not guarantee message delivery on failure!
-# messages can get dropped! Use at your own risk! Or help us improve with a PR!
-producer = SimpleProducer(kafka, async=True)
-producer.send_messages("my-topic", "async message")
-
-# To wait for acknowledgements
-# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
-# a local log before sending response
-# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
-# by all in sync replicas before sending a response
-producer = SimpleProducer(kafka, async=False,
- req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000)
-
-response = producer.send_messages("my-topic", "another message")
-
-if response:
- print(response[0].error)
- print(response[0].offset)
-
-# To send messages in batch. You can use any of the available
-# producers for doing this. The following producer will collect
-# messages in batch and send them to Kafka after 20 messages are
-# collected or every 60 seconds
-# Notes:
-# * If the producer dies before the messages are sent, there will be losses
-# * Call producer.stop() to send the messages and cleanup
-producer = SimpleProducer(kafka, batch_send=True,
- batch_send_every_n=20,
- batch_send_every_t=60)
-
-# To consume messages
-consumer = SimpleConsumer(kafka, "my-group", "my-topic")
-for message in consumer:
- # message is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.decode('utf-8')`
- print(message)
-
-kafka.close()
-```
-
-## Keyed messages
-```python
-from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
-
-kafka = KafkaClient("localhost:9092")
-
-# HashedPartitioner is default
-producer = KeyedProducer(kafka)
-producer.send("my-topic", "key1", "some message")
-producer.send("my-topic", "key2", "this methode")
-
-producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
-```
-
-## Multiprocess consumer
-```python
-from kafka import KafkaClient, MultiProcessConsumer
-
-kafka = KafkaClient("localhost:9092")
-
-# This will split the number of partitions among two processes
-consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
-
-# This will spawn processes such that each handles 2 partitions max
-consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
- partitions_per_proc=2)
-
-for message in consumer:
- print(message)
-
-for message in consumer.get_messages(count=5, block=True, timeout=4):
- print(message)
-```
-
-## Low level
-
-```python
-from kafka import KafkaClient, create_message
-from kafka.protocol import KafkaProtocol
-from kafka.common import ProduceRequest
-
-kafka = KafkaClient("localhost:9092")
-
-req = ProduceRequest(topic="my-topic", partition=1,
- messages=[create_message("some message")])
-resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
-kafka.close()
-
-resps[0].topic # "my-topic"
-resps[0].partition # 1
-resps[0].error # 0 (hopefully)
-resps[0].offset # offset of the first message sent in this request
-```
-
-# Install
-
-Install with your favorite package manager
-
-## Latest Release
-Pip:
-
-```shell
-pip install kafka-python
-```
-
-Releases are also listed at https://github.com/mumrah/kafka-python/releases
-
-
-## Bleeding-Edge
-```shell
-git clone https://github.com/mumrah/kafka-python
-pip install ./kafka-python
-```
-
-Setuptools:
-```shell
-git clone https://github.com/mumrah/kafka-python
-easy_install ./kafka-python
-```
-
-Using `setup.py` directly:
-```shell
-git clone https://github.com/mumrah/kafka-python
-cd kafka-python
-python setup.py install
-```
-
-## Optional Snappy install
-
-### Install Development Libraries
-Download and build Snappy from http://code.google.com/p/snappy/downloads/list
-
-Ubuntu:
-```shell
-apt-get install libsnappy-dev
-```
-
-OSX:
-```shell
-brew install snappy
-```
-
-From Source:
-```shell
-wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
-tar xzvf snappy-1.0.5.tar.gz
-cd snappy-1.0.5
-./configure
-make
-sudo make install
-```
-
-### Install Python Module
-Install the `python-snappy` module
-```shell
-pip install python-snappy
-```
-
-# Tests
-
-## Run the unit tests
-
-```shell
-tox
-```
-
-## Run a subset of unit tests
-```shell
-# run protocol tests only
-tox -- -v test.test_protocol
-```
-
-```shell
-# test with pypy only
-tox -e pypy
-```
-
-```shell
-# Run only 1 test, and use python 2.7
-tox -e py27 -- -v --with-id --collect-only
-# pick a test number from the list like #102
-tox -e py27 -- -v --with-id 102
-```
-
-## Run the integration tests
-
-The integration tests will actually start up real local Zookeeper
-instance and Kafka brokers, and send messages in using the client.
-
-First, get the kafka binaries for integration testing:
-```shell
-./build_integration.sh
-```
-By default, the build_integration.sh script will download binary
-distributions for all supported kafka versions.
-To test against the latest source build, set KAFKA_VERSION=trunk
-and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
-```shell
-SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
-```
-
-Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
-env variable to the server build you want to use for testing:
-```shell
-KAFKA_VERSION=0.8.0 tox
-KAFKA_VERSION=0.8.1 tox
-KAFKA_VERSION=0.8.1.1 tox
-KAFKA_VERSION=trunk tox
-```
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..5405f92
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,53 @@
+Kafka Python client
+------------------------
+.. image:: https://api.travis-ci.org/mumrah/kafka-python.png?branch=master
+ :target: https://travis-ci.org/mumrah/kafka-python
+ :alt: Build Status
+
+.. image:: https://coveralls.io/repos/mumrah/kafka-python/badge.svg?branch=master
+ :target: https://coveralls.io/r/mumrah/kafka-python?branch=master
+ :alt: Coverage Status
+
+.. image:: https://readthedocs.org/projects/kafka-python/badge/?version=latest
+ :target: http://kafka-python.readthedocs.org/en/latest/
+ :alt: Full documentation available on ReadTheDocs
+
+`Full documentation available on ReadTheDocs`_
+
+This module provides low-level protocol support for Apache Kafka as well as
+high-level consumer and producer classes. Request batching is supported by the
+protocol as well as broker-aware request routing. Gzip and Snappy compression
+is also supported for message sets.
+
+http://kafka.apache.org/
+
+On Freenode IRC at #kafka-python, as well as #apache-kafka
+
+For general discussion of kafka-client design and implementation (not python specific),
+see https://groups.google.com/forum/#!forum/kafka-clients
+
+License
+----------
+Copyright 2015, David Arthur under Apache License, v2.0. See `LICENSE`
+
+Status
+----------
+The current stable version of this package is `0.9.3`_ and is compatible with:
+
+Kafka broker versions
+
+- 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs]
+- 0.8.1.1
+- 0.8.1
+- 0.8.0
+
+Python versions
+
+- 2.6 (tested on 2.6.9)
+- 2.7 (tested on 2.7.9)
+- 3.3 (tested on 3.3.5)
+- 3.4 (tested on 3.4.2)
+- pypy (tested on pypy 2.4.0 / python 2.7.8)
+
+.. _Full documentation available on ReadTheDocs: http://kafka-python.readthedocs.org/en/latest/
+.. _0.9.3: https://github.com/mumrah/kafka-python/releases/tag/v0.9.3
diff --git a/VERSION b/VERSION
index e72b7b4..8caff32 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.9.3-dev
+0.9.4-dev
diff --git a/build_integration.sh b/build_integration.sh
index bb46e54..2b81745 100755
--- a/build_integration.sh
+++ b/build_integration.sh
@@ -1,11 +1,11 @@
#!/bin/bash
# Versions available for testing via binary distributions
-OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1"
+OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1 0.8.2.0"
# Useful configuration vars, with sensible defaults
if [ -z "$SCALA_VERSION" ]; then
- SCALA_VERSION=2.8.0
+ SCALA_VERSION=2.10
fi
# On travis CI, empty KAFKA_VERSION means skip integration tests
@@ -45,12 +45,18 @@ pushd servers
echo "-------------------------------------"
echo "Checking kafka binaries for ${kafka}"
echo
- wget -N https://archive.apache.org/dist/kafka/$kafka/kafka_${SCALA_VERSION}-${kafka}.tgz || wget -N https://archive.apache.org/dist/kafka/$kafka/kafka_${SCALA_VERSION}-${kafka}.tar.gz
+ # kafka 0.8.0 is only available w/ scala 2.8.0
+ if [ "$kafka" == "0.8.0" ]; then
+ KAFKA_ARTIFACT="kafka_2.8.0-${kafka}"
+ else
+ KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}"
+ fi
+ wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tgz || wget -N https://archive.apache.org/dist/kafka/$kafka/${KAFKA_ARTIFACT}.tar.gz
echo
if [ ! -d "../$kafka/kafka-bin" ]; then
echo "Extracting kafka binaries for ${kafka}"
- tar xzvf kafka_${SCALA_VERSION}-${kafka}.t* -C ../$kafka/
- mv ../$kafka/kafka_${SCALA_VERSION}-${kafka} ../$kafka/kafka-bin
+ tar xzvf ${KAFKA_ARTIFACT}.t* -C ../$kafka/
+ mv ../$kafka/${KAFKA_ARTIFACT} ../$kafka/kafka-bin
else
echo "$kafka/kafka-bin directory already exists -- skipping tgz extraction"
fi
diff --git a/docs/Makefile b/docs/Makefile
new file mode 100644
index 0000000..5751f68
--- /dev/null
+++ b/docs/Makefile
@@ -0,0 +1,177 @@
+# Makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line.
+SPHINXOPTS =
+SPHINXBUILD = sphinx-build
+PAPER =
+BUILDDIR = _build
+
+# User-friendly check for sphinx-build
+ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
+$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/)
+endif
+
+# Internal variables.
+PAPEROPT_a4 = -D latex_paper_size=a4
+PAPEROPT_letter = -D latex_paper_size=letter
+ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
+# the i18n builder cannot share the environment and doctrees with the others
+I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
+
+.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
+
+help:
+ @echo "Please use \`make <target>' where <target> is one of"
+ @echo " html to make standalone HTML files"
+ @echo " dirhtml to make HTML files named index.html in directories"
+ @echo " singlehtml to make a single large HTML file"
+ @echo " pickle to make pickle files"
+ @echo " json to make JSON files"
+ @echo " htmlhelp to make HTML files and a HTML help project"
+ @echo " qthelp to make HTML files and a qthelp project"
+ @echo " devhelp to make HTML files and a Devhelp project"
+ @echo " epub to make an epub"
+ @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
+ @echo " latexpdf to make LaTeX files and run them through pdflatex"
+ @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
+ @echo " text to make text files"
+ @echo " man to make manual pages"
+ @echo " texinfo to make Texinfo files"
+ @echo " info to make Texinfo files and run them through makeinfo"
+ @echo " gettext to make PO message catalogs"
+ @echo " changes to make an overview of all changed/added/deprecated items"
+ @echo " xml to make Docutils-native XML files"
+ @echo " pseudoxml to make pseudoxml-XML files for display purposes"
+ @echo " linkcheck to check all external links for integrity"
+ @echo " doctest to run all doctests embedded in the documentation (if enabled)"
+
+clean:
+ rm -rf $(BUILDDIR)/*
+
+html:
+ $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
+ @echo
+ @echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
+
+dirhtml:
+ $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
+ @echo
+ @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
+
+singlehtml:
+ $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
+ @echo
+ @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
+
+pickle:
+ $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
+ @echo
+ @echo "Build finished; now you can process the pickle files."
+
+json:
+ $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
+ @echo
+ @echo "Build finished; now you can process the JSON files."
+
+htmlhelp:
+ $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
+ @echo
+ @echo "Build finished; now you can run HTML Help Workshop with the" \
+ ".hhp project file in $(BUILDDIR)/htmlhelp."
+
+qthelp:
+ $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
+ @echo
+ @echo "Build finished; now you can run "qcollectiongenerator" with the" \
+ ".qhcp project file in $(BUILDDIR)/qthelp, like this:"
+ @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp"
+ @echo "To view the help file:"
+ @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc"
+
+devhelp:
+ $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
+ @echo
+ @echo "Build finished."
+ @echo "To view the help file:"
+ @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python"
+ @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python"
+ @echo "# devhelp"
+
+epub:
+ $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
+ @echo
+ @echo "Build finished. The epub file is in $(BUILDDIR)/epub."
+
+latex:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo
+ @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
+ @echo "Run \`make' in that directory to run these through (pdf)latex" \
+ "(use \`make latexpdf' here to do that automatically)."
+
+latexpdf:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo "Running LaTeX files through pdflatex..."
+ $(MAKE) -C $(BUILDDIR)/latex all-pdf
+ @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
+
+latexpdfja:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo "Running LaTeX files through platex and dvipdfmx..."
+ $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
+ @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
+
+text:
+ $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
+ @echo
+ @echo "Build finished. The text files are in $(BUILDDIR)/text."
+
+man:
+ $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
+ @echo
+ @echo "Build finished. The manual pages are in $(BUILDDIR)/man."
+
+texinfo:
+ $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
+ @echo
+ @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
+ @echo "Run \`make' in that directory to run these through makeinfo" \
+ "(use \`make info' here to do that automatically)."
+
+info:
+ $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
+ @echo "Running Texinfo files through makeinfo..."
+ make -C $(BUILDDIR)/texinfo info
+ @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
+
+gettext:
+ $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
+ @echo
+ @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
+
+changes:
+ $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
+ @echo
+ @echo "The overview file is in $(BUILDDIR)/changes."
+
+linkcheck:
+ $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
+ @echo
+ @echo "Link check complete; look for any errors in the above output " \
+ "or in $(BUILDDIR)/linkcheck/output.txt."
+
+doctest:
+ $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
+ @echo "Testing of doctests in the sources finished, look at the " \
+ "results in $(BUILDDIR)/doctest/output.txt."
+
+xml:
+ $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
+ @echo
+ @echo "Build finished. The XML files are in $(BUILDDIR)/xml."
+
+pseudoxml:
+ $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
+ @echo
+ @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."
diff --git a/docs/apidoc/kafka.consumer.rst b/docs/apidoc/kafka.consumer.rst
new file mode 100644
index 0000000..8595f99
--- /dev/null
+++ b/docs/apidoc/kafka.consumer.rst
@@ -0,0 +1,46 @@
+kafka.consumer package
+======================
+
+Submodules
+----------
+
+kafka.consumer.base module
+--------------------------
+
+.. automodule:: kafka.consumer.base
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.consumer.kafka module
+---------------------------
+
+.. automodule:: kafka.consumer.kafka
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.consumer.multiprocess module
+----------------------------------
+
+.. automodule:: kafka.consumer.multiprocess
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.consumer.simple module
+----------------------------
+
+.. automodule:: kafka.consumer.simple
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+
+Module contents
+---------------
+
+.. automodule:: kafka.consumer
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/apidoc/kafka.partitioner.rst b/docs/apidoc/kafka.partitioner.rst
new file mode 100644
index 0000000..ea215f1
--- /dev/null
+++ b/docs/apidoc/kafka.partitioner.rst
@@ -0,0 +1,38 @@
+kafka.partitioner package
+=========================
+
+Submodules
+----------
+
+kafka.partitioner.base module
+-----------------------------
+
+.. automodule:: kafka.partitioner.base
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.partitioner.hashed module
+-------------------------------
+
+.. automodule:: kafka.partitioner.hashed
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.partitioner.roundrobin module
+-----------------------------------
+
+.. automodule:: kafka.partitioner.roundrobin
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+
+Module contents
+---------------
+
+.. automodule:: kafka.partitioner
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/apidoc/kafka.producer.rst b/docs/apidoc/kafka.producer.rst
new file mode 100644
index 0000000..bd850bb
--- /dev/null
+++ b/docs/apidoc/kafka.producer.rst
@@ -0,0 +1,38 @@
+kafka.producer package
+======================
+
+Submodules
+----------
+
+kafka.producer.base module
+--------------------------
+
+.. automodule:: kafka.producer.base
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.producer.keyed module
+---------------------------
+
+.. automodule:: kafka.producer.keyed
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.producer.simple module
+----------------------------
+
+.. automodule:: kafka.producer.simple
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+
+Module contents
+---------------
+
+.. automodule:: kafka.producer
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/apidoc/kafka.rst b/docs/apidoc/kafka.rst
new file mode 100644
index 0000000..eb04c35
--- /dev/null
+++ b/docs/apidoc/kafka.rst
@@ -0,0 +1,79 @@
+kafka package
+=============
+
+Subpackages
+-----------
+
+.. toctree::
+
+ kafka.consumer
+ kafka.partitioner
+ kafka.producer
+
+Submodules
+----------
+
+kafka.client module
+-------------------
+
+.. automodule:: kafka.client
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.codec module
+------------------
+
+.. automodule:: kafka.codec
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.common module
+-------------------
+
+.. automodule:: kafka.common
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.conn module
+-----------------
+
+.. automodule:: kafka.conn
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.context module
+--------------------
+
+.. automodule:: kafka.context
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.protocol module
+---------------------
+
+.. automodule:: kafka.protocol
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+kafka.util module
+-----------------
+
+.. automodule:: kafka.util
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+
+Module contents
+---------------
+
+.. automodule:: kafka
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst
new file mode 100644
index 0000000..db3e580
--- /dev/null
+++ b/docs/apidoc/modules.rst
@@ -0,0 +1,7 @@
+kafka
+=====
+
+.. toctree::
+ :maxdepth: 4
+
+ kafka
diff --git a/docs/conf.py b/docs/conf.py
new file mode 100644
index 0000000..ea223c2
--- /dev/null
+++ b/docs/conf.py
@@ -0,0 +1,272 @@
+# -*- coding: utf-8 -*-
+#
+# kafka-python documentation build configuration file, created by
+# sphinx-quickstart on Sun Jan 4 12:21:50 2015.
+#
+# This file is execfile()d with the current directory set to its
+# containing dir.
+#
+# Note that not all possible configuration values are present in this
+# autogenerated file.
+#
+# All configuration values have a default; values that are commented out
+# serve to show the default.
+
+import sys
+import os
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+#sys.path.insert(0, os.path.abspath('.'))
+
+# -- General configuration ------------------------------------------------
+
+# If your documentation needs a minimal Sphinx version, state it here.
+#needs_sphinx = '1.0'
+
+# Add any Sphinx extension module names here, as strings. They can be
+# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+# ones.
+extensions = [
+ 'sphinx.ext.autodoc',
+ 'sphinx.ext.intersphinx',
+ 'sphinx.ext.viewcode',
+ 'sphinxcontrib.napoleon',
+]
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['_templates']
+
+# The suffix of source filenames.
+source_suffix = '.rst'
+
+# The encoding of source files.
+#source_encoding = 'utf-8-sig'
+
+# The master toctree document.
+master_doc = 'index'
+
+# General information about the project.
+project = u'kafka-python'
+copyright = u'2015, David Arthur'
+
+# The version info for the project you're documenting, acts as replacement for
+# |version| and |release|, also used in various other places throughout the
+# built documents.
+#
+# The short X.Y version.
+with open('../VERSION') as version_file:
+ version = version_file.read()
+
+# The full version, including alpha/beta/rc tags.
+release = version
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#language = None
+
+# There are two options for replacing |today|: either, you set today to some
+# non-false value, then it is used:
+#today = ''
+# Else, today_fmt is used as the format for a strftime call.
+#today_fmt = '%B %d, %Y'
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+exclude_patterns = ['_build']
+
+# The reST default role (used for this markup: `text`) to use for all
+# documents.
+#default_role = None
+
+# If true, '()' will be appended to :func: etc. cross-reference text.
+#add_function_parentheses = True
+
+# If true, the current module name will be prepended to all description
+# unit titles (such as .. function::).
+#add_module_names = True
+
+# If true, sectionauthor and moduleauthor directives will be shown in the
+# output. They are ignored by default.
+#show_authors = False
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = 'sphinx'
+
+# A list of ignored prefixes for module index sorting.
+#modindex_common_prefix = []
+
+# If true, keep warnings as "system message" paragraphs in the built documents.
+#keep_warnings = False
+
+
+# -- Options for HTML output ----------------------------------------------
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+html_theme = 'default'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further. For a list of options available for each theme, see the
+# documentation.
+#html_theme_options = {}
+
+# Add any paths that contain custom themes here, relative to this directory.
+#html_theme_path = []
+
+# The name for this set of Sphinx documents. If None, it defaults to
+# "<project> v<release> documentation".
+#html_title = None
+
+# A shorter title for the navigation bar. Default is the same as html_title.
+#html_short_title = None
+
+# The name of an image file (relative to this directory) to place at the top
+# of the sidebar.
+#html_logo = None
+
+# The name of an image file (within the static path) to use as favicon of the
+# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
+# pixels large.
+#html_favicon = None
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+# Add any extra paths that contain custom files (such as robots.txt or
+# .htaccess) here, relative to this directory. These files are copied
+# directly to the root of the documentation.
+#html_extra_path = []
+
+# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
+# using the given strftime format.
+#html_last_updated_fmt = '%b %d, %Y'
+
+# If true, SmartyPants will be used to convert quotes and dashes to
+# typographically correct entities.
+#html_use_smartypants = True
+
+# Custom sidebar templates, maps document names to template names.
+#html_sidebars = {}
+
+# Additional templates that should be rendered to pages, maps page names to
+# template names.
+#html_additional_pages = {}
+
+# If false, no module index is generated.
+#html_domain_indices = True
+
+# If false, no index is generated.
+#html_use_index = True
+
+# If true, the index is split into individual pages for each letter.
+#html_split_index = False
+
+# If true, links to the reST sources are added to the pages.
+#html_show_sourcelink = True
+
+# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
+#html_show_sphinx = True
+
+# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
+#html_show_copyright = True
+
+# If true, an OpenSearch description file will be output, and all pages will
+# contain a <link> tag referring to it. The value of this option must be the
+# base URL from which the finished HTML is served.
+#html_use_opensearch = ''
+
+# This is the file name suffix for HTML files (e.g. ".xhtml").
+#html_file_suffix = None
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'kafka-pythondoc'
+
+
+# -- Options for LaTeX output ---------------------------------------------
+
+latex_elements = {
+# The paper size ('letterpaper' or 'a4paper').
+#'papersize': 'letterpaper',
+
+# The font size ('10pt', '11pt' or '12pt').
+#'pointsize': '10pt',
+
+# Additional stuff for the LaTeX preamble.
+#'preamble': '',
+}
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title,
+# author, documentclass [howto, manual, or own class]).
+latex_documents = [
+ ('index', 'kafka-python.tex', u'kafka-python Documentation',
+ u'David Arthur', 'manual'),
+]
+
+# The name of an image file (relative to this directory) to place at the top of
+# the title page.
+#latex_logo = None
+
+# For "manual" documents, if this is true, then toplevel headings are parts,
+# not chapters.
+#latex_use_parts = False
+
+# If true, show page references after internal links.
+#latex_show_pagerefs = False
+
+# If true, show URL addresses after external links.
+#latex_show_urls = False
+
+# Documents to append as an appendix to all manuals.
+#latex_appendices = []
+
+# If false, no module index is generated.
+#latex_domain_indices = True
+
+
+# -- Options for manual page output ---------------------------------------
+
+# One entry per manual page. List of tuples
+# (source start file, name, description, authors, manual section).
+man_pages = [
+ ('index', 'kafka-python', u'kafka-python Documentation',
+ [u'David Arthur'], 1)
+]
+
+# If true, show URL addresses after external links.
+#man_show_urls = False
+
+
+# -- Options for Texinfo output -------------------------------------------
+
+# Grouping the document tree into Texinfo files. List of tuples
+# (source start file, target name, title, author,
+# dir menu entry, description, category)
+texinfo_documents = [
+ ('index', 'kafka-python', u'kafka-python Documentation',
+ u'David Arthur', 'kafka-python', 'One line description of project.',
+ 'Miscellaneous'),
+]
+
+# Documents to append as an appendix to all manuals.
+#texinfo_appendices = []
+
+# If false, no module index is generated.
+#texinfo_domain_indices = True
+
+# How to display URL addresses: 'footnote', 'no', or 'inline'.
+#texinfo_show_urls = 'footnote'
+
+# If true, do not generate a @detailmenu in the "Top" node's menu.
+#texinfo_no_detailmenu = False
+
+on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
+
+if not on_rtd: # only import and set the theme if we're building docs locally
+ import sphinx_rtd_theme
+ html_theme = 'sphinx_rtd_theme'
+ html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
diff --git a/docs/index.rst b/docs/index.rst
new file mode 100644
index 0000000..e4a9ac7
--- /dev/null
+++ b/docs/index.rst
@@ -0,0 +1,58 @@
+kafka-python
+============
+
+This module provides low-level protocol support for Apache Kafka as well as
+high-level consumer and producer classes. Request batching is supported by the
+protocol as well as broker-aware request routing. Gzip and Snappy compression
+is also supported for message sets.
+
+http://kafka.apache.org/
+
+On Freenode IRC at #kafka-python, as well as #apache-kafka
+
+For general discussion of kafka-client design and implementation (not python specific),
+see https://groups.google.com/forum/m/#!forum/kafka-clients
+
+Status
+------
+
+The current stable version of this package is `0.9.3 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.3>`_ and is compatible with:
+
+Kafka broker versions
+
+* 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs]
+* 0.8.1.1
+* 0.8.1
+* 0.8.0
+
+Python versions
+
+* 2.6 (tested on 2.6.9)
+* 2.7 (tested on 2.7.9)
+* 3.3 (tested on 3.3.5)
+* 3.4 (tested on 3.4.2)
+* pypy (tested on pypy 2.4.0 / python 2.7.8)
+
+License
+-------
+
+Copyright 2015, David Arthur under Apache License, v2.0. See `LICENSE <https://github.com/mumrah/kafka-python/blob/master/LICENSE>`_.
+
+
+Contents
+--------
+
+.. toctree::
+ :maxdepth: 2
+
+ install
+ tests
+ usage
+ API reference </apidoc/modules>
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
diff --git a/docs/install.rst b/docs/install.rst
new file mode 100644
index 0000000..1dd6d4e
--- /dev/null
+++ b/docs/install.rst
@@ -0,0 +1,79 @@
+Install
+=======
+
+Install with your favorite package manager
+
+Latest Release
+--------------
+Pip:
+
+.. code:: bash
+
+ pip install kafka-python
+
+Releases are also listed at https://github.com/mumrah/kafka-python/releases
+
+
+Bleeding-Edge
+-------------
+
+.. code:: bash
+
+ git clone https://github.com/mumrah/kafka-python
+ pip install ./kafka-python
+
+Setuptools:
+
+.. code:: bash
+
+ git clone https://github.com/mumrah/kafka-python
+ easy_install ./kafka-python
+
+Using `setup.py` directly:
+
+.. code:: bash
+
+ git clone https://github.com/mumrah/kafka-python
+ cd kafka-python
+ python setup.py install
+
+
+Optional Snappy install
+-----------------------
+
+Install Development Libraries
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Download and build Snappy from http://code.google.com/p/snappy/downloads/list
+
+Ubuntu:
+
+.. code:: bash
+
+ apt-get install libsnappy-dev
+
+OSX:
+
+.. code:: bash
+
+ brew install snappy
+
+From Source:
+
+.. code:: bash
+
+ wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
+ tar xzvf snappy-1.0.5.tar.gz
+ cd snappy-1.0.5
+ ./configure
+ make
+ sudo make install
+
+Install Python Module
+^^^^^^^^^^^^^^^^^^^^^
+
+Install the `python-snappy` module
+
+.. code:: bash
+
+ pip install python-snappy
diff --git a/docs/make.bat b/docs/make.bat
new file mode 100644
index 0000000..2e9d7dc
--- /dev/null
+++ b/docs/make.bat
@@ -0,0 +1,242 @@
+@ECHO OFF
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set BUILDDIR=_build
+set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
+set I18NSPHINXOPTS=%SPHINXOPTS% .
+if NOT "%PAPER%" == "" (
+ set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
+ set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS%
+)
+
+if "%1" == "" goto help
+
+if "%1" == "help" (
+ :help
+ echo.Please use `make ^<target^>` where ^<target^> is one of
+ echo. html to make standalone HTML files
+ echo. dirhtml to make HTML files named index.html in directories
+ echo. singlehtml to make a single large HTML file
+ echo. pickle to make pickle files
+ echo. json to make JSON files
+ echo. htmlhelp to make HTML files and a HTML help project
+ echo. qthelp to make HTML files and a qthelp project
+ echo. devhelp to make HTML files and a Devhelp project
+ echo. epub to make an epub
+ echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
+ echo. text to make text files
+ echo. man to make manual pages
+ echo. texinfo to make Texinfo files
+ echo. gettext to make PO message catalogs
+ echo. changes to make an overview over all changed/added/deprecated items
+ echo. xml to make Docutils-native XML files
+ echo. pseudoxml to make pseudoxml-XML files for display purposes
+ echo. linkcheck to check all external links for integrity
+ echo. doctest to run all doctests embedded in the documentation if enabled
+ goto end
+)
+
+if "%1" == "clean" (
+ for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
+ del /q /s %BUILDDIR%\*
+ goto end
+)
+
+
+%SPHINXBUILD% 2> nul
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.http://sphinx-doc.org/
+ exit /b 1
+)
+
+if "%1" == "html" (
+ %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/html.
+ goto end
+)
+
+if "%1" == "dirhtml" (
+ %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
+ goto end
+)
+
+if "%1" == "singlehtml" (
+ %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
+ goto end
+)
+
+if "%1" == "pickle" (
+ %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the pickle files.
+ goto end
+)
+
+if "%1" == "json" (
+ %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the JSON files.
+ goto end
+)
+
+if "%1" == "htmlhelp" (
+ %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run HTML Help Workshop with the ^
+.hhp project file in %BUILDDIR%/htmlhelp.
+ goto end
+)
+
+if "%1" == "qthelp" (
+ %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run "qcollectiongenerator" with the ^
+.qhcp project file in %BUILDDIR%/qthelp, like this:
+ echo.^> qcollectiongenerator %BUILDDIR%\qthelp\kafka-python.qhcp
+ echo.To view the help file:
+ echo.^> assistant -collectionFile %BUILDDIR%\qthelp\kafka-python.ghc
+ goto end
+)
+
+if "%1" == "devhelp" (
+ %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished.
+ goto end
+)
+
+if "%1" == "epub" (
+ %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The epub file is in %BUILDDIR%/epub.
+ goto end
+)
+
+if "%1" == "latex" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdf" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdfja" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf-ja
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "text" (
+ %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The text files are in %BUILDDIR%/text.
+ goto end
+)
+
+if "%1" == "man" (
+ %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The manual pages are in %BUILDDIR%/man.
+ goto end
+)
+
+if "%1" == "texinfo" (
+ %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo.
+ goto end
+)
+
+if "%1" == "gettext" (
+ %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The message catalogs are in %BUILDDIR%/locale.
+ goto end
+)
+
+if "%1" == "changes" (
+ %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.The overview file is in %BUILDDIR%/changes.
+ goto end
+)
+
+if "%1" == "linkcheck" (
+ %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Link check complete; look for any errors in the above output ^
+or in %BUILDDIR%/linkcheck/output.txt.
+ goto end
+)
+
+if "%1" == "doctest" (
+ %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Testing of doctests in the sources finished, look at the ^
+results in %BUILDDIR%/doctest/output.txt.
+ goto end
+)
+
+if "%1" == "xml" (
+ %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The XML files are in %BUILDDIR%/xml.
+ goto end
+)
+
+if "%1" == "pseudoxml" (
+ %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml.
+ goto end
+)
+
+:end
diff --git a/docs/requirements.txt b/docs/requirements.txt
new file mode 100644
index 0000000..86b4f05
--- /dev/null
+++ b/docs/requirements.txt
@@ -0,0 +1,7 @@
+sphinx
+sphinxcontrib-napoleon
+
+# Install kafka-python in editable mode
+# This allows the sphinx autodoc module
+# to load the Python modules and extract docstrings.
+# -e ..
diff --git a/docs/tests.rst b/docs/tests.rst
new file mode 100644
index 0000000..df9a3ef
--- /dev/null
+++ b/docs/tests.rst
@@ -0,0 +1,59 @@
+Tests
+=====
+
+Run the unit tests
+------------------
+
+.. code:: bash
+
+ tox
+
+
+Run a subset of unit tests
+--------------------------
+
+.. code:: bash
+
+ # run protocol tests only
+ tox -- -v test.test_protocol
+
+ # test with pypy only
+ tox -e pypy
+
+ # Run only 1 test, and use python 2.7
+ tox -e py27 -- -v --with-id --collect-only
+
+ # pick a test number from the list like #102
+ tox -e py27 -- -v --with-id 102
+
+
+Run the integration tests
+-------------------------
+
+The integration tests will actually start up real local Zookeeper
+instance and Kafka brokers, and send messages in using the client.
+
+First, get the kafka binaries for integration testing:
+
+.. code:: bash
+
+ ./build_integration.sh
+
+By default, the build_integration.sh script will download binary
+distributions for all supported kafka versions.
+To test against the latest source build, set KAFKA_VERSION=trunk
+and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
+
+.. code:: bash
+
+ SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
+
+Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
+env variable to the server build you want to use for testing:
+
+.. code:: bash
+
+ KAFKA_VERSION=0.8.0 tox
+ KAFKA_VERSION=0.8.1 tox
+ KAFKA_VERSION=0.8.1.1 tox
+ KAFKA_VERSION=trunk tox
diff --git a/docs/usage.rst b/docs/usage.rst
new file mode 100644
index 0000000..141cf93
--- /dev/null
+++ b/docs/usage.rst
@@ -0,0 +1,124 @@
+Usage
+=====
+
+High level
+----------
+
+.. code:: python
+
+ from kafka import SimpleProducer, KafkaClient, KafkaConsumer
+
+ # To send messages synchronously
+ kafka = KafkaClient("localhost:9092")
+ producer = SimpleProducer(kafka)
+
+ # Note that the application is responsible for encoding messages to type str
+ producer.send_messages("my-topic", "some message")
+ producer.send_messages("my-topic", "this method", "is variadic")
+
+ # Send unicode message
+ producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
+
+ # To send messages asynchronously
+ # WARNING: current implementation does not guarantee message delivery on failure!
+ # messages can get dropped! Use at your own risk! Or help us improve with a PR!
+ producer = SimpleProducer(kafka, async=True)
+ producer.send_messages("my-topic", "async message")
+
+ # To wait for acknowledgements
+ # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
+ # a local log before sending response
+ # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
+ # by all in sync replicas before sending a response
+ producer = SimpleProducer(kafka, async=False,
+ req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=2000)
+
+ response = producer.send_messages("my-topic", "another message")
+
+ if response:
+ print(response[0].error)
+ print(response[0].offset)
+
+ # To send messages in batch. You can use any of the available
+ # producers for doing this. The following producer will collect
+ # messages in batch and send them to Kafka after 20 messages are
+ # collected or every 60 seconds
+ # Notes:
+ # * If the producer dies before the messages are sent, there will be losses
+ # * Call producer.stop() to send the messages and cleanup
+ producer = SimpleProducer(kafka, batch_send=True,
+ batch_send_every_n=20,
+ batch_send_every_t=60)
+
+ # To consume messages
+ consumer = KafkaConsumer("my-topic", group_id="my_group",
+ metadata_broker_list=["localhost:9092"])
+ for message in consumer:
+ # message is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.decode('utf-8')`
+ print(message)
+
+ kafka.close()
+
+
+Keyed messages
+--------------
+
+.. code:: python
+
+ from kafka import (KafkaClient, KeyedProducer, HashedPartitioner,
+ RoundRobinPartitioner)
+
+ kafka = KafkaClient("localhost:9092")
+
+ # HashedPartitioner is default
+ producer = KeyedProducer(kafka)
+ producer.send("my-topic", "key1", "some message")
+ producer.send("my-topic", "key2", "this methode")
+
+ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+
+
+Multiprocess consumer
+---------------------
+
+.. code:: python
+
+ from kafka import KafkaClient, MultiProcessConsumer
+
+ kafka = KafkaClient("localhost:9092")
+
+ # This will split the number of partitions among two processes
+ consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
+
+ # This will spawn processes such that each handles 2 partitions max
+ consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
+ partitions_per_proc=2)
+
+ for message in consumer:
+ print(message)
+
+ for message in consumer.get_messages(count=5, block=True, timeout=4):
+ print(message)
+
+Low level
+---------
+
+.. code:: python
+
+ from kafka import KafkaClient, create_message
+ from kafka.protocol import KafkaProtocol
+ from kafka.common import ProduceRequest
+
+ kafka = KafkaClient("localhost:9092")
+
+ req = ProduceRequest(topic="my-topic", partition=1,
+ messages=[create_message("some message")])
+ resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
+ kafka.close()
+
+ resps[0].topic # "my-topic"
+ resps[0].partition # 1
+ resps[0].error # 0 (hopefully)
+ resps[0].offset # offset of the first message sent in this request
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 8ccdb4c..3536084 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -4,7 +4,7 @@ import pkg_resources
__version__ = pkg_resources.require('kafka-python')[0].version
__author__ = 'David Arthur'
__license__ = 'Apache License 2.0'
-__copyright__ = 'Copyright 2014, David Arthur under Apache License, v2.0'
+__copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0'
from kafka.client import KafkaClient
from kafka.conn import KafkaConnection
diff --git a/kafka/client.py b/kafka/client.py
index 7a0cf18..48a534e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -15,6 +15,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata,
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
+from kafka.util import kafka_bytestring
log = logging.getLogger("kafka")
@@ -30,7 +31,7 @@ class KafkaClient(object):
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
- self.client_id = client_id
+ self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
@@ -85,7 +86,7 @@ class KafkaClient(object):
self.load_metadata_for_topics(topic)
# If the partition doesn't actually exist, raise
- if partition not in self.topic_partitions[topic]:
+ if partition not in self.topic_partitions.get(topic, []):
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
@@ -131,19 +132,21 @@ class KafkaClient(object):
the leader broker for that partition using the supplied encode/decode
functions
- Params
- ======
+ Arguments:
+
payloads: list of object-like entities with a topic (str) and
- partition (int) attribute
+ partition (int) attribute
+
encode_fn: a method to encode the list of payloads to a request body,
- must accept client_id, correlation_id, and payloads as
- keyword arguments
+ must accept client_id, correlation_id, and payloads as
+ keyword arguments
+
decode_fn: a method to decode a response body into response objects.
- The response objects must be object-like and have topic
- and partition attributes
+ The response objects must be object-like and have topic
+ and partition attributes
+
+ Returns:
- Return
- ======
List of response objects in the same order as the supplied payloads
"""
@@ -175,8 +178,13 @@ class KafkaClient(object):
# Send the request, recv the response
try:
conn.send(requestId, request)
+
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
if decoder_fn is None:
continue
+
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -257,9 +265,9 @@ class KafkaClient(object):
def get_partition_ids_for_topic(self, topic):
if topic not in self.topic_partitions:
- return None
+ return []
- return list(self.topic_partitions[topic])
+ return sorted(list(self.topic_partitions[topic]))
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
@@ -285,9 +293,9 @@ class KafkaClient(object):
This method should be called after receiving any error
- @param: *topics (optional)
- If a list of topics is provided, the metadata refresh will be limited
- to the specified topics only.
+ Arguments:
+ *topics (optional): If a list of topics is provided,
+ the metadata refresh will be limited to the specified topics only.
Exceptions:
----------
@@ -384,18 +392,16 @@ class KafkaClient(object):
sent to a specific broker. Output is a list of responses in the
same order as the list of payloads specified
- Params
- ======
- payloads: list of ProduceRequest
- fail_on_error: boolean, should we raise an Exception if we
- encounter an API error?
- callback: function, instead of returning the ProduceResponse,
- first pass it through this function
-
- Return
- ======
- list of ProduceResponse or callback(ProduceResponse), in the
- order of input payloads
+ Arguments:
+ payloads: list of ProduceRequest
+ fail_on_error: boolean, should we raise an Exception if we
+ encounter an API error?
+ callback: function, instead of returning the ProduceResponse,
+ first pass it through this function
+
+ Returns:
+ list of ProduceResponse or callback(ProduceResponse), in the
+ order of input payloads
"""
encoder = functools.partial(
diff --git a/kafka/conn.py b/kafka/conn.py
index ddfee8b..30debec 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -47,10 +47,11 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
- host: the host name or IP address of a kafka broker
- port: the port number the kafka broker is listening on
- timeout: default 120. The socket timeout for sending and receiving data
- in seconds. None means no timeout, so a request can block forever.
+ Arguments:
+ host: the host name or IP address of a kafka broker
+ port: the port number the kafka broker is listening on
+ timeout: default 120. The socket timeout for sending and receiving data
+ in seconds. None means no timeout, so a request can block forever.
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
@@ -116,8 +117,10 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"""
Send a request to Kafka
- param: request_id -- can be any int (used only for debug logging...)
- param: payload -- an encoded kafka packet (see KafkaProtocol)
+
+ Arguments::
+ request_id (int): can be any int (used only for debug logging...)
+ payload: an encoded kafka packet (see KafkaProtocol)
"""
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
@@ -135,8 +138,12 @@ class KafkaConnection(local):
def recv(self, request_id):
"""
Get a response packet from Kafka
- param: request_id -- can be any int (only used for debug logging...)
- returns encoded kafka packet response from server as type str
+
+ Arguments:
+ request_id: can be any int (only used for debug logging...)
+
+ Returns:
+ str: Encoded kafka packet response from server
"""
log.debug("Reading response %d from Kafka" % request_id)
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 2464aaf..9cdcf89 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -32,9 +32,11 @@ class Consumer(object):
Base class to be used by other consumers. Not to be used directly
This base class provides logic for
+
* initialization and fetching metadata of partitions
* Auto-commit logic
* APIs for fetching pending message count
+
"""
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
@@ -93,8 +95,9 @@ class Consumer(object):
"""
Commit offsets for this consumer
- partitions: list of partitions to commit, default is to commit
- all of them
+ Keyword Arguments:
+ partitions (list): list of partitions to commit, default is to commit
+ all of them
"""
# short circuit if nothing happened. This check is kept outside
@@ -148,7 +151,8 @@ class Consumer(object):
"""
Gets the pending message count
- partitions: list of partitions to check for, default is to check all
+ Keyword Arguments:
+ partitions (list): list of partitions to check for, default is to check all
"""
if not partitions:
partitions = self.offsets.keys()
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index f16b526..49ffa7b 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -47,79 +47,86 @@ DEFAULT_CONSUMER_CONFIG = {
'rebalance_backoff_ms': 2000,
}
-BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
-
class KafkaConsumer(object):
"""
A simpler kafka consumer
- ```
- # A very basic 'tail' consumer, with no stored offset management
- kafka = KafkaConsumer('topic1')
- for m in kafka:
- print m
-
- # Alternate interface: next()
- print kafka.next()
-
- # Alternate interface: batch iteration
- while True:
- for m in kafka.fetch_messages():
- print m
- print "Done with batch - let's do another!"
- ```
-
- ```
- # more advanced consumer -- multiple topics w/ auto commit offset management
- kafka = KafkaConsumer('topic1', 'topic2',
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
-
- # Infinite iteration
- for m in kafka:
- process_message(m)
- kafka.task_done(m)
-
- # Alternate interface: next()
- m = kafka.next()
- process_message(m)
- kafka.task_done(m)
-
- # If auto_commit_enable is False, remember to commit() periodically
- kafka.commit()
-
- # Batch process interface
- while True:
- for m in kafka.fetch_messages():
+ .. code:: python
+
+ # A very basic 'tail' consumer, with no stored offset management
+ kafka = KafkaConsumer('topic1',
+ metadata_broker_list=['localhost:9092'])
+ for m in kafka:
+ print m
+
+ # Alternate interface: next()
+ print kafka.next()
+
+ # Alternate interface: batch iteration
+ while True:
+ for m in kafka.fetch_messages():
+ print m
+ print "Done with batch - let's do another!"
+
+
+ .. code:: python
+
+ # more advanced consumer -- multiple topics w/ auto commit offset
+ # management
+ kafka = KafkaConsumer('topic1', 'topic2',
+ metadata_broker_list=['localhost:9092'],
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in kafka:
+ process_message(m)
+ kafka.task_done(m)
+
+ # Alternate interface: next()
+ m = kafka.next()
process_message(m)
kafka.task_done(m)
- ```
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
+
+ # Batch process interface
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+
messages (m) are namedtuples with attributes:
- m.topic: topic name (str)
- m.partition: partition number (int)
- m.offset: message offset on topic-partition log (int)
- m.key: key (bytes - can be None)
- m.value: message (output of deserializer_class - default is raw bytes)
+
+ * `m.topic`: topic name (str)
+ * `m.partition`: partition number (int)
+ * `m.offset`: message offset on topic-partition log (int)
+ * `m.key`: key (bytes - can be None)
+ * `m.value`: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor,
otherwise defaults will be used:
- client_id='kafka.consumer.kafka',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- metadata_broker_list=None,
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=lambda msg: msg,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1
+
+ .. code:: python
+
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ metadata_broker_list=None,
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
@@ -133,6 +140,9 @@ class KafkaConsumer(object):
"""
Configuration settings can be passed to constructor,
otherwise defaults will be used:
+
+ .. code:: python
+
client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
@@ -159,13 +169,6 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
- # Handle str/bytes conversions
- for config_key in BYTES_CONFIGURATION_KEYS:
- if isinstance(self._config[config_key], six.string_types):
- logger.warning("Converting configuration key '%s' to bytes" %
- config_key)
- self._config[config_key] = self._config[config_key].encode('utf-8')
-
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
@@ -189,28 +192,35 @@ class KafkaConsumer(object):
Optionally specify offsets to start from
Accepts types:
- str (utf-8): topic name (will consume all available partitions)
- tuple: (topic, partition)
- dict: { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
+
+ * str (utf-8): topic name (will consume all available partitions)
+ * tuple: (topic, partition)
+ * dict:
+ - { topic: partition }
+ - { topic: [partition list] }
+ - { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
- Ex:
- kafka = KafkaConsumer()
+ * tuple: (topic, partition, offset)
+ * dict: { (topic, partition): offset, ... }
+
+ Example:
+
+ .. code:: python
+
+ kafka = KafkaConsumer()
- # Consume topic1-all; topic2-partition2; topic3-partition0
- kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+ # Consume topic1-all; topic2-partition2; topic3-partition0
+ kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
- # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
- # using tuples --
- kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+ # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
+ # using tuples --
+ kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+
+ # using dict --
+ kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
- # using dict --
- kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
"""
self._topics = []
self._client.load_metadata_for_topics()
@@ -309,10 +319,12 @@ class KafkaConsumer(object):
Otherwise blocks indefinitely
Note that this is also the method called internally during iteration:
- ```
- for m in consumer:
- pass
- ```
+
+ .. code:: python
+
+ for m in consumer:
+ pass
+
"""
self._set_consumer_timeout_start()
while True:
@@ -336,11 +348,12 @@ class KafkaConsumer(object):
OffsetOutOfRange, per the configured `auto_offset_reset` policy
Key configuration parameters:
- `fetch_message_max_bytes`
- `fetch_max_wait_ms`
- `fetch_min_bytes`
- `deserializer_class`
- `auto_offset_reset`
+
+ * `fetch_message_max_bytes`
+ * `fetch_max_wait_ms`
+ * `fetch_min_bytes`
+ * `deserializer_class`
+ * `auto_offset_reset`
"""
max_bytes = self._config['fetch_message_max_bytes']
@@ -408,6 +421,10 @@ class KafkaConsumer(object):
offset, message.key,
self._config['deserializer_class'](message.value))
+ if offset < self._offsets.fetch[topic_partition]:
+ logger.debug('Skipping message %s because its offset is less than the consumer offset',
+ msg)
+ continue
# Only increment fetch offset if we safely got the message and deserialized
self._offsets.fetch[topic_partition] = offset + 1
@@ -418,20 +435,18 @@ class KafkaConsumer(object):
"""
Request available fetch offsets for a single topic/partition
- @param topic (str)
- @param partition (int)
- @param request_time_ms (int) -- Used to ask for all messages before a
- certain time (ms). There are two special
- values. Specify -1 to receive the latest
- offset (i.e. the offset of the next coming
- message) and -2 to receive the earliest
- available offset. Note that because offsets
- are pulled in descending order, asking for
- the earliest offset will always return you
- a single element.
- @param max_num_offsets (int)
-
- @return offsets (list)
+ Arguments:
+ topic (str)
+ partition (int)
+ request_time_ms (int): Used to ask for all messages before a
+ certain time (ms). There are two special values. Specify -1 to receive the latest
+ offset (i.e. the offset of the next coming message) and -2 to receive the earliest
+ available offset. Note that because offsets are pulled in descending order, asking for
+ the earliest offset will always return you a single element.
+ max_num_offsets (int)
+
+ Returns:
+ offsets (list)
"""
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
@@ -448,9 +463,12 @@ class KafkaConsumer(object):
def offsets(self, group=None):
"""
- Returns a copy of internal offsets struct
- optional param: group [fetch|commit|task_done|highwater]
- if no group specified, returns all groups
+ Keyword Arguments:
+ group: Either "fetch", "commit", "task_done", or "highwater".
+ If no group specified, returns all groups.
+
+ Returns:
+ A copy of internal offsets struct
"""
if not group:
return {
@@ -498,8 +516,8 @@ class KafkaConsumer(object):
Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group.
- Note -- this functionality requires server version >=0.8.1.1
- see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ **Note**: this functionality requires server version >=0.8.1.1
+ See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_.
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
@@ -531,7 +549,7 @@ class KafkaConsumer(object):
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(self._config['group_id'],
+ resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
commits,
fail_on_error=False)
@@ -595,7 +613,7 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
+ kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 912e64b..4dc04dc 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer):
A consumer implementation that consumes partitions for a topic in
parallel using multiple processes
- client: a connected KafkaClient
- group: a name for this consumer, used for offset storage and must be unique
- topic: the topic to consume
-
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- num_procs: Number of processes to start for consuming messages.
- The available partitions will be divided among these processes
- partitions_per_proc: Number of partitions to be allocated per process
- (overrides num_procs)
+ Arguments:
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+
+ Keyword Arguments:
+ auto_commit: default True. Whether or not to auto commit the offsets
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ num_procs: Number of processes to start for consuming messages.
+ The available partitions will be divided among these processes
+ partitions_per_proc: Number of partitions to be allocated per process
+ (overrides num_procs)
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer):
"""
Fetch the specified number of messages
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ Keyword Arguments:
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index df975f4..3d250ea 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -8,6 +8,7 @@ import logging
import time
import six
+import sys
try:
from Queue import Empty, Queue
@@ -16,7 +17,9 @@ except ImportError: # python 2
from kafka.common import (
FetchRequest, OffsetRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError,
+ OffsetOutOfRangeError, check_error
)
from .base import (
Consumer,
@@ -67,24 +70,36 @@ class SimpleConsumer(Consumer):
A simple consumer implementation that consumes all/specified partitions
for a topic
- client: a connected KafkaClient
- group: a name for this consumer, used for offset storage and must be unique
- topic: the topic to consume
- partitions: An optional list of partitions to consume the data from
-
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no
- timeout, so it will wait forever.
+ Arguments:
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+
+ Keyword Arguments:
+ partitions: An optional list of partitions to consume the data from
+
+ auto_commit: default True. Whether or not to auto commit the offsets
+
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ fetch_size_bytes: number of bytes to request in a FetchRequest
+
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
+
+ auto_offset_reset: default largest. Reset partition offsets upon
+ OffsetOutOfRangeError. Valid values are largest and smallest.
+ Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -98,7 +113,8 @@ class SimpleConsumer(Consumer):
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None):
+ iter_timeout=None,
+ auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -117,12 +133,38 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
+ self.auto_offset_reset = auto_offset_reset
self.queue = Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
+ def reset_partition_offset(self, partition):
+ LATEST = -1
+ EARLIEST = -2
+ if self.auto_offset_reset == 'largest':
+ reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
+ elif self.auto_offset_reset == 'smallest':
+ reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
+ else:
+ # Let's raise an reasonable exception type if user calls
+ # outside of an exception context
+ if sys.exc_info() == (None, None, None):
+ raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
+ 'valid auto_offset_reset setting '
+ '(largest|smallest)')
+ # Otherwise we should re-raise the upstream exception
+ # b/c it typically includes additional data about
+ # the request that triggered it, and we do not want to drop that
+ raise
+
+ # send_offset_request
+ (resp, ) = self.client.send_offset_request(reqs)
+ check_error(resp)
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -133,11 +175,13 @@ class SimpleConsumer(Consumer):
"""
Alter the current offset in the consumer, similar to fseek
- offset: how much to modify the offset
- whence: where to modify it from
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
+ Arguments:
+ offset: how much to modify the offset
+ whence: where to modify it from
+
+ * 0 is relative to the earliest available offset (head)
+ * 1 is relative to the current offset
+ * 2 is relative to the latest known offset (tail)
"""
if whence == 1: # relative to current position
@@ -180,11 +224,12 @@ class SimpleConsumer(Consumer):
"""
Fetch the specified number of messages
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ Keyword Arguments:
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
if timeout is not None:
@@ -286,14 +331,35 @@ class SimpleConsumer(Consumer):
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes)
+ min_bytes=self.fetch_min_bytes,
+ fail_on_error=False
+ )
retry_partitions = {}
for resp in responses:
+
+ try:
+ check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
+ self.client.reset_topic_metadata(resp.topic)
+ raise
+ except OffsetOutOfRangeError:
+ log.warning("OffsetOutOfRangeError for %s - %d. "
+ "Resetting partition offset...",
+ resp.topic, resp.partition)
+ self.reset_partition_offset(resp.partition)
+ # Retry this partition
+ retry_partitions[resp.partition] = partitions[resp.partition]
+ continue
+
partition = resp.partition
buffer_size = partitions[partition]
try:
for message in resp.messages:
+ if message.offset < self.fetch_offsets[partition]:
+ log.debug('Skipping message %s because its offset is less than the consumer offset',
+ message)
+ continue
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
diff --git a/kafka/context.py b/kafka/context.py
index 98ed7b3..ade4db8 100644
--- a/kafka/context.py
+++ b/kafka/context.py
@@ -18,6 +18,8 @@ class OffsetCommitContext(object):
Example:
+ .. code:: python
+
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
consumer.provide_partition_info()
consumer.fetch_last_known_offsets()
@@ -57,7 +59,10 @@ class OffsetCommitContext(object):
In order to know the current partition, it is helpful to initialize
the consumer to provide partition info via:
+ .. code:: python
+
consumer.provide_partition_info()
+
"""
max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py
index c62b7ed..857f634 100644
--- a/kafka/partitioner/base.py
+++ b/kafka/partitioner/base.py
@@ -7,17 +7,18 @@ class Partitioner(object):
"""
Initialize the partitioner
- partitions - A list of available partitions (during startup)
+ Arguments:
+ partitions: A list of available partitions (during startup)
"""
self.partitions = partitions
- def partition(self, key, partitions):
+ def partition(self, key, partitions=None):
"""
Takes a string key and num_partitions as argument and returns
a partition to be used for the message
- partitions - The list of partitions is passed in every call. This
- may look like an overhead, but it will be useful
- (in future) when we handle cases like rebalancing
+ Arguments:
+ key: the key to use for partitioning
+ partitions: (optional) a list of partitions.
"""
raise NotImplementedError('partition function has to be implemented')
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index 587a3de..fb5e598 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -5,7 +5,9 @@ class HashedPartitioner(Partitioner):
Implements a partitioner which selects the target partition based on
the hash of the key
"""
- def partition(self, key, partitions):
+ def partition(self, key, partitions=None):
+ if not partitions:
+ partitions = self.partitions
size = len(partitions)
idx = hash(key) % size
diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py
index 54d00da..6439e53 100644
--- a/kafka/partitioner/roundrobin.py
+++ b/kafka/partitioner/roundrobin.py
@@ -15,9 +15,9 @@ class RoundRobinPartitioner(Partitioner):
self.partitions = partitions
self.iterpart = cycle(partitions)
- def partition(self, key, partitions):
+ def partition(self, key, partitions=None):
# Refresh the partition list if necessary
- if self.partitions != partitions:
+ if partitions and self.partitions != partitions:
self._set_partitions(partitions)
return next(self.iterpart)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 6e19b92..695f195 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -85,20 +85,20 @@ class Producer(object):
"""
Base class to be used by producers
- Params:
- client - The Kafka client instance to use
- async - If set to true, the messages are sent asynchronously via another
+ Arguments:
+ client: The Kafka client instance to use
+ async: If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
WARNING!!! current implementation of async producer does not
guarantee message delivery. Use at your own risk! Or help us
improve with a PR!
- req_acks - A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
+ req_acks: A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send: If True, messages are send in batches
+ batch_send_every_n: If set, messages are send in batches of this size
+ batch_send_every_t: If set, messages are send after this timeout
"""
ACK_NOT_REQUIRED = 0 # No ack is required
@@ -127,6 +127,7 @@ class Producer(object):
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
+ self.stopped = False
if codec is None:
codec = CODEC_NONE
@@ -212,3 +213,8 @@ class Producer(object):
if self.proc.is_alive():
self.proc.terminate()
+ self.stopped = True
+
+ def __del__(self):
+ if not self.stopped:
+ self.stop()
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 68c70d9..36328ed 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -15,17 +15,19 @@ class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
- Args:
- client - The kafka client instance
- partitioner - A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner
- async - If True, the messages are sent asynchronously via another
+ Arguments:
+ client: The kafka client instance
+
+ Keyword Arguments:
+ partitioner: A partitioner class that will be used to get the partition
+ to send the message to. Must be derived from Partitioner
+ async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
+ ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send: If True, messages are send in batches
+ batch_send_every_n: If set, messages are send in batches of this size
+ batch_send_every_t: If set, messages are send after this timeout
"""
def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
@@ -52,7 +54,7 @@ class KeyedProducer(Producer):
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
partitioner = self.partitioners[topic]
- return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
+ return partitioner.partition(key)
def send_messages(self,topic,key,*msg):
partition = self._next_partition(topic, key)
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 401b79b..2699cf2 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import logging
import random
+import six
from itertools import cycle
@@ -19,21 +20,23 @@ class SimpleProducer(Producer):
"""
A simple, round-robin producer. Each message goes to exactly one partition
- Params:
- client - The Kafka client instance to use
- async - If True, the messages are sent asynchronously via another
+ Arguments:
+ client: The Kafka client instance to use
+
+ Keyword Arguments:
+ async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
- req_acks - A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
- random_start - If true, randomize the initial partition which the
- the first message block will be published to, otherwise
- if false, the first message block will always publish
- to partition 0 before cycling through each partition
+ req_acks: A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send: If True, messages are send in batches
+ batch_send_every_n: If set, messages are send in batches of this size
+ batch_send_every_t: If set, messages are send after this timeout
+ random_start: If true, randomize the initial partition which the
+ the first message block will be published to, otherwise
+ if false, the first message block will always publish
+ to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
@@ -66,8 +69,13 @@ class SimpleProducer(Producer):
return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg):
+ if not isinstance(topic, six.binary_type):
+ topic = topic.encode('utf-8')
+
partition = self._next_partition(topic)
- return super(SimpleProducer, self).send_messages(topic, partition, *msg)
+ return super(SimpleProducer, self).send_messages(
+ topic, partition, *msg
+ )
def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async
diff --git a/kafka/protocol.py b/kafka/protocol.py
index a85c7eb..2a39de6 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -185,18 +185,18 @@ class KafkaProtocol(object):
"""
Encode some ProduceRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- payloads: list of ProduceRequest
- acks: How "acky" you want the request to be
- 0: immediate response
- 1: written to disk by the leader
- 2+: waits for this many number of replicas to sync
- -1: waits for all replicas to be in sync
- timeout: Maximum time the server will wait for acks from replicas.
- This is _not_ a socket timeout
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: list of ProduceRequest
+ acks: How "acky" you want the request to be
+ 0: immediate response
+ 1: written to disk by the leader
+ 2+: waits for this many number of replicas to sync
+ -1: waits for all replicas to be in sync
+ timeout: Maximum time the server will wait for acks from replicas.
+ This is _not_ a socket timeout
+
"""
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -225,9 +225,9 @@ class KafkaProtocol(object):
"""
Decode bytes to a ProduceResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
+
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -248,14 +248,13 @@ class KafkaProtocol(object):
"""
Encodes some FetchRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- payloads: list of FetchRequest
- max_wait_time: int, how long to block waiting on min_bytes of data
- min_bytes: int, the minimum number of bytes to accumulate before
- returning the response
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: list of FetchRequest
+ max_wait_time: int, how long to block waiting on min_bytes of data
+ min_bytes: int, the minimum number of bytes to accumulate before
+ returning the response
"""
payloads = [] if payloads is None else payloads
@@ -284,9 +283,8 @@ class KafkaProtocol(object):
"""
Decode bytes to a FetchResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -333,9 +331,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -360,11 +357,10 @@ class KafkaProtocol(object):
"""
Encode a MetadataRequest
- Params
- ======
- client_id: string
- correlation_id: int
- topics: list of strings
+ Arguments:
+ client_id: string
+ correlation_id: int
+ topics: list of strings
"""
if payloads is None:
topics = [] if topics is None else topics
@@ -388,9 +384,8 @@ class KafkaProtocol(object):
"""
Decode bytes to a MetadataResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
@@ -439,12 +434,11 @@ class KafkaProtocol(object):
"""
Encode some OffsetCommitRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- group: string, the consumer group you are committing offsets for
- payloads: list of OffsetCommitRequest
+ Arguments:
+ client_id: string
+ correlation_id: int
+ group: string, the consumer group you are committing offsets for
+ payloads: list of OffsetCommitRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -470,9 +464,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetCommitResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -491,12 +484,11 @@ class KafkaProtocol(object):
"""
Encode some OffsetFetchRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- group: string, the consumer group you are fetching offsets for
- payloads: list of OffsetFetchRequest
+ Arguments:
+ client_id: string
+ correlation_id: int
+ group: string, the consumer group you are fetching offsets for
+ payloads: list of OffsetFetchRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -522,9 +514,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetFetchResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
@@ -547,10 +538,10 @@ def create_message(payload, key=None):
"""
Construct a Message
- Params
- ======
- payload: bytes, the payload to send to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payload: bytes, the payload to send to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
return Message(0, 0, key, payload)
@@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None):
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
- Params
- ======
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
@@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None):
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
- Params
- ======
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
diff --git a/kafka/queue.py b/kafka/queue.py
deleted file mode 100644
index ada495f..0000000
--- a/kafka/queue.py
+++ /dev/null
@@ -1,219 +0,0 @@
-from __future__ import absolute_import
-
-from copy import copy
-import logging
-from multiprocessing import Process, Queue, Event
-from Queue import Empty
-import time
-
-from kafka.client import KafkaClient, FetchRequest, ProduceRequest
-
-log = logging.getLogger("kafka")
-
-raise NotImplementedError("Still need to refactor this class")
-
-
-class KafkaConsumerProcess(Process):
- def __init__(self, client, topic, partition, out_queue, barrier,
- consumer_fetch_size=1024, consumer_sleep=200):
- self.client = copy(client)
- self.topic = topic
- self.partition = partition
- self.out_queue = out_queue
- self.barrier = barrier
- self.consumer_fetch_size = consumer_fetch_size
- self.consumer_sleep = consumer_sleep / 1000.
- log.info("Initializing %s" % self)
- Process.__init__(self)
-
- def __str__(self):
- return "[KafkaConsumerProcess: topic=%s, \
- partition=%s, sleep=%s]" % \
- (self.topic, self.partition, self.consumer_sleep)
-
- def run(self):
- self.barrier.wait()
- log.info("Starting %s" % self)
- fetchRequest = FetchRequest(self.topic, self.partition,
- offset=0, size=self.consumer_fetch_size)
-
- while True:
- if self.barrier.is_set() is False:
- log.info("Shutdown %s" % self)
- self.client.close()
- break
-
- lastOffset = fetchRequest.offset
- (messages, fetchRequest) = self.client.get_message_set(fetchRequest)
-
- if fetchRequest.offset == lastOffset:
- log.debug("No more data for this partition, "
- "sleeping a bit (200ms)")
- time.sleep(self.consumer_sleep)
- continue
-
- for message in messages:
- self.out_queue.put(message)
-
-
-class KafkaProducerProcess(Process):
- def __init__(self, client, topic, in_queue, barrier,
- producer_flush_buffer=500,
- producer_flush_timeout=2000,
- producer_timeout=100):
-
- self.client = copy(client)
- self.topic = topic
- self.in_queue = in_queue
- self.barrier = barrier
- self.producer_flush_buffer = producer_flush_buffer
- self.producer_flush_timeout = producer_flush_timeout / 1000.
- self.producer_timeout = producer_timeout / 1000.
- log.info("Initializing %s" % self)
- Process.__init__(self)
-
- def __str__(self):
- return "[KafkaProducerProcess: topic=%s, \
- flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
- (self.topic,
- self.producer_flush_buffer,
- self.producer_flush_timeout,
- self.producer_timeout)
-
- def run(self):
- self.barrier.wait()
- log.info("Starting %s" % self)
- messages = []
- last_produce = time.time()
-
- def flush(messages):
- self.client.send_message_set(ProduceRequest(self.topic, -1,
- messages))
- del messages[:]
-
- while True:
- if self.barrier.is_set() is False:
- log.info("Shutdown %s, flushing messages" % self)
- flush(messages)
- self.client.close()
- break
-
- if len(messages) > self.producer_flush_buffer:
- log.debug("Message count threshold reached. Flushing messages")
- flush(messages)
- last_produce = time.time()
-
- elif (time.time() - last_produce) > self.producer_flush_timeout:
- log.debug("Producer timeout reached. Flushing messages")
- flush(messages)
- last_produce = time.time()
-
- try:
- msg = KafkaClient.create_message(
- self.in_queue.get(True, self.producer_timeout))
- messages.append(msg)
-
- except Empty:
- continue
-
-
-class KafkaQueue(object):
- def __init__(self, client, topic, partitions,
- producer_config=None, consumer_config=None):
- """
- KafkaQueue a Queue-like object backed by a Kafka producer and some
- number of consumers
-
- Messages are eagerly loaded by the consumer in batches of size
- consumer_fetch_size.
- Messages are buffered in the producer thread until
- producer_flush_timeout or producer_flush_buffer is reached.
-
- Params
- ======
- client: KafkaClient object
- topic: str, the topic name
- partitions: list of ints, the partions to consume from
- producer_config: dict, see below
- consumer_config: dict, see below
-
- Consumer Config
- ===============
- consumer_fetch_size: int, number of bytes to fetch in one call
- to Kafka. Default is 1024
- consumer_sleep: int, time in milliseconds a consumer should sleep
- when it reaches the end of a partition. Default is 200
-
- Producer Config
- ===============
- producer_timeout: int, time in milliseconds a producer should
- wait for messages to enqueue for producing.
- Default is 100
- producer_flush_timeout: int, time in milliseconds a producer
- should allow messages to accumulate before
- sending to Kafka. Default is 2000
- producer_flush_buffer: int, number of messages a producer should
- allow to accumulate. Default is 500
-
- """
- producer_config = {} if producer_config is None else producer_config
- consumer_config = {} if consumer_config is None else consumer_config
-
- self.in_queue = Queue()
- self.out_queue = Queue()
- self.consumers = []
- self.barrier = Event()
-
- # Initialize and start consumer threads
- for partition in partitions:
- consumer = KafkaConsumerProcess(client, topic, partition,
- self.in_queue, self.barrier,
- **consumer_config)
- consumer.start()
- self.consumers.append(consumer)
-
- # Initialize and start producer thread
- self.producer = KafkaProducerProcess(client, topic, self.out_queue,
- self.barrier, **producer_config)
- self.producer.start()
-
- # Trigger everything to start
- self.barrier.set()
-
- def get(self, block=True, timeout=None):
- """
- Consume a message from Kafka
-
- Params
- ======
- block: boolean, default True
- timeout: int, number of seconds to wait when blocking, default None
-
- Returns
- =======
- msg: str, the payload from Kafka
- """
- return self.in_queue.get(block, timeout).payload
-
- def put(self, msg, block=True, timeout=None):
- """
- Send a message to Kafka
-
- Params
- ======
- msg: std, the message to send
- block: boolean, default True
- timeout: int, number of seconds to wait when blocking, default None
- """
- self.out_queue.put(msg, block, timeout)
-
- def close(self):
- """
- Close the internal queues and Kafka consumers/producer
- """
- self.in_queue.close()
- self.out_queue.close()
- self.barrier.clear()
- self.producer.join()
- for consumer in self.consumers:
- consumer.join()
diff --git a/kafka/util.py b/kafka/util.py
index 72ac521..78c3607 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -103,10 +103,12 @@ class ReentrantTimer(object):
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
- t: timer interval in milliseconds
- fn: a callable to invoke
- args: tuple of args to be passed to function
- kwargs: keyword arguments to be passed to function
+ Arguments:
+
+ t: timer interval in milliseconds
+ fn: a callable to invoke
+ args: tuple of args to be passed to function
+ kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):
@@ -124,7 +126,11 @@ class ReentrantTimer(object):
self.active = None
def _timer(self, active):
- while not active.wait(self.t):
+ # python2.6 Event.wait() always returns None
+ # python2.7 and greater returns the flag value (true/false)
+ # we want the flag value, so add an 'or' here for python2.6
+ # this is redundant for later python versions (FLAG OR FLAG == FLAG)
+ while not (active.wait(self.t) or active.is_set()):
self.fn(*self.args, **self.kwargs)
def start(self):
@@ -144,3 +150,7 @@ class ReentrantTimer(object):
self.thread.join(self.t + 1)
# noinspection PyAttributeOutsideInit
self.timer = None
+ self.fn = None
+
+ def __del__(self):
+ self.stop()
diff --git a/setup.py b/setup.py
index 4b1e18e..f1c1954 100644
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,5 @@
import sys
-
+import os
from setuptools import setup, Command
with open('VERSION', 'r') as v:
@@ -26,6 +26,10 @@ test_require = ['tox', 'mock']
if sys.version_info < (2, 7):
test_require.append('unittest2')
+here = os.path.abspath(os.path.dirname(__file__))
+
+with open(os.path.join(here, 'README.rst')) as f:
+ README = f.read()
setup(
name="kafka-python",
@@ -46,15 +50,10 @@ setup(
url="https://github.com/mumrah/kafka-python",
license="Apache License 2.0",
description="Pure Python client for Apache Kafka",
- long_description="""
-This module provides low-level protocol support for Apache Kafka as well as
-high-level consumer and producer classes. Request batching is supported by the
-protocol as well as broker-aware request routing. Gzip and Snappy compression
-is also supported for message sets.
-""",
+ long_description=README,
keywords="apache kafka",
install_requires=['six'],
- classifiers = [
+ classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
@@ -62,6 +61,9 @@ is also supported for message sets.
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.3",
+ "Programming Language :: Python :: 3.4",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Libraries :: Python Modules",
]
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4723220..9c89190 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -5,7 +5,7 @@ from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import (
- ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions('all')
+ def test_simple_consumer_smallest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ consumer = self.consumer(auto_offset_reset='smallest')
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to smallest we should read all 200
+ # messages from beginning.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_largest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer()
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to largest we should not read any
+ # messages.
+ self.assert_message_count([message for message in consumer], 0)
+ # Send 200 new messages to the queue
+ self.send_messages(0, range(200, 300))
+ self.send_messages(1, range(300, 400))
+ # Since the offset is set to largest we should read all the new messages.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_no_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer(auto_offset_reset=None)
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ with self.assertRaises(OffsetOutOfRangeError):
+ consumer.get_message()
+
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index ca71f2d..7d27526 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -7,6 +7,7 @@ from . import unittest
from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer
+from kafka.producer import KeyedProducer
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
@@ -17,8 +18,7 @@ from test.testutil import (
class TestFailover(KafkaIntegrationTestCase):
create_client = False
- @classmethod
- def setUpClass(cls): # noqa
+ def setUp(self):
if not os.environ.get('KAFKA_VERSION'):
return
@@ -27,33 +27,41 @@ class TestFailover(KafkaIntegrationTestCase):
partitions = 2
# mini zookeeper, 2 kafka brokers
- cls.zk = ZookeeperFixture.instance()
- kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
- cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ self.zk = ZookeeperFixture.instance()
+ kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
+ self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
- cls.client = KafkaClient(hosts)
+ hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
+ self.client = KafkaClient(hosts)
+ super(TestFailover, self).setUp()
- @classmethod
- def tearDownClass(cls):
+ def tearDown(self):
+ super(TestFailover, self).tearDown()
if not os.environ.get('KAFKA_VERSION'):
return
- cls.client.close()
- for broker in cls.brokers:
+ self.client.close()
+ for broker in self.brokers:
broker.close()
- cls.zk.close()
+ self.zk.close()
@kafka_versions("all")
def test_switch_leader(self):
topic = self.topic
partition = 0
- # Test the base class Producer -- send_messages to a specific partition
+ # Testing the base Producer class here so that we can easily send
+ # messages to a specific partition, kill the leader for that partition
+ # and check that after another broker takes leadership the producer
+ # is able to resume sending messages
+
+ # require that the server commit messages to all in-sync replicas
+ # so that failover doesn't lose any messages on server-side
+ # and we can assert that server-side message count equals client-side
producer = Producer(self.client, async=False,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
- # Send 10 random messages
+ # Send 100 random messages to a specific partition
self._send_random_messages(producer, topic, partition, 100)
# kill leader for partition
@@ -80,7 +88,7 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 100)
# count number of messages
- # Should be equal to 10 before + 1 recovery + 10 after
+ # Should be equal to 100 before + 1 recovery + 100 after
self.assert_message_count(topic, 201, partitions=(partition,))
@@ -116,6 +124,45 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,))
+ @kafka_versions("all")
+ def test_switch_leader_keyed_producer(self):
+ topic = self.topic
+
+ producer = KeyedProducer(self.client, async=False)
+
+ # Send 10 random messages
+ for _ in range(10):
+ key = random_string(3)
+ msg = random_string(10)
+ producer.send_messages(topic, key, msg)
+
+ # kill leader for partition 0
+ self._kill_leader(topic, 0)
+
+ recovered = False
+ started = time.time()
+ timeout = 60
+ while not recovered and (time.time() - started) < timeout:
+ try:
+ key = random_string(3)
+ msg = random_string(10)
+ producer.send_messages(topic, key, msg)
+ if producer.partitioners[topic].partition(key) == 0:
+ recovered = True
+ except (FailedPayloadsError, ConnectionError):
+ logging.debug("caught exception sending message -- will retry")
+ continue
+
+ # Verify we successfully sent the message
+ self.assertTrue(recovered)
+
+ # send some more messages just to make sure no more exceptions
+ for _ in range(10):
+ key = random_string(3)
+ msg = random_string(10)
+ producer.send_messages(topic, key, msg)
+
+
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
diff --git a/test/test_producer.py b/test/test_producer.py
index caf8fe3..f6b3d6a 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,6 +7,7 @@ from . import unittest
from kafka.producer.base import Producer
+
class TestKafkaProducer(unittest.TestCase):
def test_producer_message_types(self):
@@ -25,3 +26,17 @@ class TestKafkaProducer(unittest.TestCase):
# This should not raise an exception
producer.send_messages(topic, partition, m)
+ def test_topic_message_types(self):
+ from kafka.producer.simple import SimpleProducer
+
+ client = MagicMock()
+
+ def partitions(topic):
+ return [0, 1]
+
+ client.get_partition_ids_for_topic = partitions
+
+ producer = SimpleProducer(client, random_start=False)
+ topic = b"test-topic"
+ producer.send_messages(topic, b'hi')
+ assert client.send_produce_request.called
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 19d28bd..38df69f 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -14,12 +14,12 @@ from kafka.common import (
FetchRequest, ProduceRequest,
UnknownTopicOrPartitionError, LeaderNotAvailableError
)
+from kafka.producer.base import Producer
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
- topic = b'produce_topic'
@classmethod
def setUpClass(cls): # noqa
@@ -140,25 +140,26 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_simple_producer(self):
- start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
producer = SimpleProducer(self.client, random_start=False)
# Goes to first partition, randomly.
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- self.assert_produce_response(resp, start_offset0)
+ self.assert_produce_response(resp, start_offsets[0])
# Goes to the next partition, randomly.
resp = producer.send_messages(self.topic, self.msg("three"))
- self.assert_produce_response(resp, start_offset1)
+ self.assert_produce_response(resp, start_offsets[1])
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
- self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ])
# Goes back to the first partition because there's only two partitions
resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
- self.assert_produce_response(resp, start_offset0+2)
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
+ self.assert_produce_response(resp, start_offsets[0]+2)
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
producer.stop()
@@ -194,110 +195,38 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp3[0].partition, 0)
@kafka_versions("all")
- def test_round_robin_partitioner(self):
- start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
-
- producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
- resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
- resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
- resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
- resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
-
- self.assert_produce_response(resp1, start_offset0+0)
- self.assert_produce_response(resp2, start_offset1+0)
- self.assert_produce_response(resp3, start_offset0+1)
- self.assert_produce_response(resp4, start_offset1+1)
-
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ])
- self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ])
-
- producer.stop()
-
- @kafka_versions("all")
- def test_hashed_partitioner(self):
- start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
-
- producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
- resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
- resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
- resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
- resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
- resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
-
- offsets = {0: start_offset0, 1: start_offset1}
- messages = {0: [], 1: []}
-
- keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
- resps = [resp1, resp2, resp3, resp4, resp5]
- msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
-
- for key, resp, msg in zip(keys, resps, msgs):
- k = hash(key) % 2
- offset = offsets[k]
- self.assert_produce_response(resp, offset)
- offsets[k] += 1
- messages[k].append(msg)
-
- self.assert_fetch_offset(0, start_offset0, messages[0])
- self.assert_fetch_offset(1, start_offset1, messages[1])
-
- producer.stop()
-
- @kafka_versions("all")
- def test_acks_none(self):
- start_offset0 = self.current_offset(self.topic, 0)
+ def test_async_simple_producer(self):
+ partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+ start_offset = self.current_offset(self.topic, partition)
- producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED,
- random_start=False)
+ producer = SimpleProducer(self.client, async=True, random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
- producer.stop()
-
- @kafka_versions("all")
- def test_acks_local_write(self):
- start_offset0 = self.current_offset(self.topic, 0)
-
- producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- random_start=False)
- resp = producer.send_messages(self.topic, self.msg("one"))
-
- self.assert_produce_response(resp, start_offset0)
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+ # wait for the server to report a new highwatermark
+ while self.current_offset(self.topic, partition) == start_offset:
+ time.sleep(0.1)
- producer.stop()
-
- @kafka_versions("all")
- def test_acks_cluster_commit(self):
- start_offset0 = self.current_offset(self.topic, 0)
-
- producer = SimpleProducer(
- self.client,
- req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
- random_start=False)
-
- resp = producer.send_messages(self.topic, self.msg("one"))
- self.assert_produce_response(resp, start_offset0)
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+ self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
- start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+ # Configure batch producer
+ batch_messages = 5
+ batch_interval = 5
producer = SimpleProducer(
self.client,
batch_send=True,
- batch_send_every_n=5,
- batch_send_every_t=20,
+ batch_send_every_n=batch_messages,
+ batch_send_every_t=batch_interval,
random_start=False)
- # Send 5 messages and do a fetch
+ # Send 4 messages -- should not trigger a batch
resp = producer.send_messages(self.topic,
self.msg("one"),
self.msg("two"),
@@ -309,9 +238,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(resp), 0)
# It hasn't sent yet
- self.assert_fetch_offset(0, start_offset0, [])
- self.assert_fetch_offset(1, start_offset1, [])
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [])
+ # send 3 more messages -- should trigger batch on first 5
resp = producer.send_messages(self.topic,
self.msg("five"),
self.msg("six"),
@@ -321,30 +251,32 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# Batch mode is async. No ack
self.assertEqual(len(resp), 0)
- self.assert_fetch_offset(0, start_offset0, [
+ # send messages groups all *msgs in a single call to the same partition
+ # so we should see all messages from the first call in one partition
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [
self.msg("one"),
self.msg("two"),
self.msg("three"),
self.msg("four"),
])
- self.assert_fetch_offset(1, start_offset1, [
+ # Because we are batching every 5 messages, we should only see one
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [
self.msg("five"),
- # self.msg("six"),
- # self.msg("seven"),
])
producer.stop()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
- start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+ batch_interval = 5
producer = SimpleProducer(self.client,
batch_send=True,
batch_send_every_n=100,
- batch_send_every_t=5,
+ batch_send_every_t=batch_interval,
random_start=False)
# Send 5 messages and do a fetch
@@ -359,8 +291,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(resp), 0)
# It hasn't sent yet
- self.assert_fetch_offset(0, start_offset0, [])
- self.assert_fetch_offset(1, start_offset1, [])
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [])
resp = producer.send_messages(self.topic,
self.msg("five"),
@@ -372,16 +304,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(resp), 0)
# Wait the timeout out
- time.sleep(5)
+ time.sleep(batch_interval)
- self.assert_fetch_offset(0, start_offset0, [
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [
self.msg("one"),
self.msg("two"),
self.msg("three"),
self.msg("four"),
])
- self.assert_fetch_offset(1, start_offset1, [
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [
self.msg("five"),
self.msg("six"),
self.msg("seven"),
@@ -389,40 +321,146 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+
+ ############################
+ # KeyedProducer Tests #
+ ############################
+
@kafka_versions("all")
- def test_async_simple_producer(self):
- start_offset0 = self.current_offset(self.topic, 0)
+ def test_round_robin_partitioner(self):
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
- producer = SimpleProducer(self.client, async=True, random_start=False)
- resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEqual(len(resp), 0)
+ producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+ resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
+ resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
+ resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
+ resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
+
+ self.assert_produce_response(resp1, start_offsets[0]+0)
+ self.assert_produce_response(resp2, start_offsets[1]+0)
+ self.assert_produce_response(resp3, start_offsets[0]+1)
+ self.assert_produce_response(resp4, start_offsets[1]+1)
+
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ])
+
+ producer.stop()
+
+ @kafka_versions("all")
+ def test_hashed_partitioner(self):
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+ producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
+ resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
+ resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
+ resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
+ resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
+ resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
+
+ offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
+ messages = {partitions[0]: [], partitions[1]: []}
+
+ keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
+ resps = [resp1, resp2, resp3, resp4, resp5]
+ msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
+
+ for key, resp, msg in zip(keys, resps, msgs):
+ k = hash(key) % 2
+ partition = partitions[k]
+ offset = offsets[partition]
+ self.assert_produce_response(resp, offset)
+ offsets[partition] += 1
+ messages[partition].append(msg)
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+ self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]])
producer.stop()
@kafka_versions("all")
def test_async_keyed_producer(self):
- start_offset0 = self.current_offset(self.topic, 0)
+ partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+ start_offset = self.current_offset(self.topic, partition)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
self.assertEqual(len(resp), 0)
- self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
+ # wait for the server to report a new highwatermark
+ while self.current_offset(self.topic, partition) == start_offset:
+ time.sleep(0.1)
+
+ self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+
+ producer.stop()
+
+ ############################
+ # Producer ACK Tests #
+ ############################
+
+ @kafka_versions("all")
+ def test_acks_none(self):
+ partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+ start_offset = self.current_offset(self.topic, partition)
+
+ producer = Producer(
+ self.client,
+ req_acks=Producer.ACK_NOT_REQUIRED,
+ )
+ resp = producer.send_messages(self.topic, partition, self.msg("one"))
+
+ # No response from produce request with no acks required
+ self.assertEqual(len(resp), 0)
+
+ # But the message should still have been delivered
+ self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+ producer.stop()
+
+ @kafka_versions("all")
+ def test_acks_local_write(self):
+ partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+ start_offset = self.current_offset(self.topic, partition)
+
+ producer = Producer(
+ self.client,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ )
+ resp = producer.send_messages(self.topic, partition, self.msg("one"))
+
+ self.assert_produce_response(resp, start_offset)
+ self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+
+ producer.stop()
+
+ @kafka_versions("all")
+ def test_acks_cluster_commit(self):
+ partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+ start_offset = self.current_offset(self.topic, partition)
+
+ producer = Producer(
+ self.client,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+ )
+
+ resp = producer.send_messages(self.topic, partition, self.msg("one"))
+ self.assert_produce_response(resp, start_offset)
+ self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
- def assert_produce_request(self, messages, initial_offset, message_ct):
- produce = ProduceRequest(self.topic, 0, messages=messages)
+ def assert_produce_request(self, messages, initial_offset, message_ct,
+ partition=0):
+ produce = ProduceRequest(self.topic, partition, messages=messages)
# There should only be one response message from the server.
# This will throw an exception if there's more than one.
resp = self.client.send_produce_request([ produce ])
self.assert_produce_response(resp, initial_offset)
- self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct)
+ self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct)
def assert_produce_response(self, resp, initial_offset):
self.assertEqual(len(resp), 1)
diff --git a/tox.ini b/tox.ini
index 547470b..71565fd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,6 @@
[tox]
-envlist = lint, py26, py27, pypy, py33, py34
+envlist = lint, py26, py27, pypy, py33, py34, docs
+
[testenv]
deps =
six
@@ -36,4 +37,14 @@ deps =
unittest2
mock
pylint
-commands = pylint {posargs: -E --ignore=queue.py kafka test}
+commands = pylint {posargs: -E kafka test}
+
+[testenv:docs]
+deps =
+ sphinxcontrib-napoleon
+ sphinx_rtd_theme
+ sphinx
+
+commands =
+ sphinx-apidoc -o docs/apidoc/ kafka/
+ sphinx-build -b html docs/ docs/_build