summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--AUTHORS.md7
-rw-r--r--CHANGES.md69
-rw-r--r--README.rst8
-rwxr-xr-xbuild_integration.sh2
-rw-r--r--docs/conf.py5
-rw-r--r--docs/index.rst6
-rw-r--r--docs/usage.rst75
-rw-r--r--kafka/client.py55
-rw-r--r--kafka/consumer/base.py30
-rw-r--r--kafka/consumer/simple.py162
-rw-r--r--kafka/partitioner/__init__.py5
-rw-r--r--kafka/partitioner/hashed.py96
-rw-r--r--kafka/producer/base.py5
-rw-r--r--kafka/version.py2
-rw-r--r--servers/0.8.0/resources/kafka.properties8
-rw-r--r--servers/0.8.1.1/resources/kafka.properties6
-rw-r--r--servers/0.8.1/resources/kafka.properties8
-rw-r--r--servers/0.8.2.0/resources/kafka.properties6
-rw-r--r--servers/0.8.2.1/resources/kafka.properties124
-rw-r--r--servers/0.8.2.1/resources/log4j.properties24
-rw-r--r--servers/0.8.2.1/resources/zookeeper.properties21
-rw-r--r--test/fixtures.py18
-rw-r--r--test/test_client_integration.py34
-rw-r--r--test/test_consumer.py44
-rw-r--r--test/test_consumer_integration.py24
-rw-r--r--test/test_failover_integration.py40
27 files changed, 718 insertions, 168 deletions
diff --git a/.travis.yml b/.travis.yml
index 7184bc8..136c19f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,7 +12,7 @@ env:
- KAFKA_VERSION=0.8.0
- KAFKA_VERSION=0.8.1
- KAFKA_VERSION=0.8.1.1
- - KAFKA_VERSION=0.8.2.0
+ - KAFKA_VERSION=0.8.2.1
before_install:
- sudo apt-get install libsnappy-dev
diff --git a/AUTHORS.md b/AUTHORS.md
index 67e3789..d9ce2ed 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -1,16 +1,19 @@
# Contributors
-Top 10 contributors, listed by contribution. See https://github.com/mumrah/kafka-python/graphs/contributors for the full list
+Top contributors, listed by contribution. See https://github.com/mumrah/kafka-python/graphs/contributors for the full list
* David Arthur, [@mumrah](https://github.com/mumrah)
* Dana Powers, [@dpkp](https://github.com/dpkp)
* Mahendra M, [@mahendra](https://github.com/mahendra)
* Mark Roberts, [@wizzat](https://github.com/wizzat)
* Omar, [@rdiomar](https://github.com/rdiomar) - RIP, Omar. 2014
+* Viktor Shlapakov, [@vshlapakov](https://github.com/vshlapakov)
* Bruno Renié, [@brutasse](https://github.com/brutasse)
* Marc Labbé, [@mrtheb](https://github.com/mrtheb)
+* John Anderson, [@sontek](https://github.com/sontek)
* Ivan Pouzyrevsky, [@sandello](https://github.com/sandello)
* Thomas Dimson, [@cosbynator](https://github.com/cosbynator)
-* Zack Dever, [@zever](https://github.com/zever)
+* Alex Couture-Beil, [@alexcb](https://github.com/alexcb)
+* Zack Dever, [@zackdever](https://github.com/zackdever)
Thanks to all who have contributed!
diff --git a/CHANGES.md b/CHANGES.md
index 5704afa..c94cbd5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,72 @@
+# 0.9.4 (June 11, 2015)
+
+Consumers
+* Refactor SimpleConsumer internal fetch handling (dpkp PR 399)
+* Handle exceptions in SimpleConsumer commit() and reset_partition_offset() (dpkp PR 404)
+* Improve FailedPayloadsError handling in KafkaConsumer (dpkp PR 398)
+* KafkaConsumer: avoid raising KeyError in task_done (dpkp PR 389)
+* MultiProcessConsumer -- support configured partitions list (dpkp PR 380)
+* Fix SimpleConsumer leadership change handling (dpkp PR 393)
+* Fix SimpleConsumer connection error handling (reAsOn2010 PR 392)
+* Improve Consumer handling of 'falsy' partition values (wting PR 342)
+* Fix _offsets call error in KafkaConsumer (hellais PR 376)
+* Fix str/bytes bug in KafkaConsumer (dpkp PR 365)
+* Register atexit handlers for consumer and producer thread/multiprocess cleanup (dpkp PR 360)
+* Always fetch commit offsets in base consumer unless group is None (dpkp PR 356)
+* Stop consumer threads on delete (dpkp PR 357)
+* Deprecate metadata_broker_list in favor of bootstrap_servers in KafkaConsumer (dpkp PR 340)
+* Support pass-through parameters in multiprocess consumer (scrapinghub PR 336)
+* Enable offset commit on SimpleConsumer.seek (ecanzonieri PR 350)
+* Improve multiprocess consumer partition distribution (scrapinghub PR 335)
+* Ignore messages with offset less than requested (wkiser PR 328)
+* Handle OffsetOutOfRange in SimpleConsumer (ecanzonieri PR 296)
+
+Producers
+* Add Murmur2Partitioner (dpkp PR 378)
+* Log error types in SimpleProducer and SimpleConsumer (dpkp PR 405)
+* SimpleProducer support configuration of fail_on_error (dpkp PR 396)
+* Deprecate KeyedProducer.send() (dpkp PR 379)
+* Further improvements to async producer code (dpkp PR 388)
+* Add more configuration parameters for async producer (dpkp)
+* Deprecate SimpleProducer batch_send=True in favor of async (dpkp)
+* Improve async producer error handling and retry logic (vshlapakov PR 331)
+* Support message keys in async producer (vshlapakov PR 329)
+* Use threading instead of multiprocessing for Async Producer (vshlapakov PR 330)
+* Stop threads on __del__ (chmduquesne PR 324)
+* Fix leadership failover handling in KeyedProducer (dpkp PR 314)
+
+KafkaClient
+* Add .topics property for list of known topics (dpkp)
+* Fix request / response order guarantee bug in KafkaClient (dpkp PR 403)
+* Improve KafkaClient handling of connection failures in _get_conn (dpkp)
+* Client clears local metadata cache before updating from server (dpkp PR 367)
+* KafkaClient should return a response or error for each request - enable better retry handling (dpkp PR 366)
+* Improve str/bytes conversion in KafkaClient and KafkaConsumer (dpkp PR 332)
+* Always return sorted partition ids in client.get_partition_ids_for_topic() (dpkp PR 315)
+
+Documentation
+* Cleanup Usage Documentation
+* Improve KafkaConsumer documentation (dpkp PR 341)
+* Update consumer documentation (sontek PR 317)
+* Add doc configuration for tox (sontek PR 316)
+* Switch to .rst doc format (sontek PR 321)
+* Fixup google groups link in README (sontek PR 320)
+* Automate documentation at kafka-python.readthedocs.org
+
+Internals
+* Switch integration testing from 0.8.2.0 to 0.8.2.1 (dpkp PR 402)
+* Fix most flaky tests, improve debug logging, improve fixture handling (dpkp)
+* General style cleanups (dpkp PR 394)
+* Raise error on duplicate topic-partition payloads in protocol grouping (dpkp)
+* Use module-level loggers instead of simply 'kafka' (dpkp)
+* Remove pkg_resources check for __version__ at runtime (dpkp PR 387)
+* Make external API consistently support python3 strings for topic (kecaps PR 361)
+* Fix correlation id overflow (dpkp PR 355)
+* Cleanup kafka/common structs (dpkp PR 338)
+* Use context managers in gzip_encode / gzip_decode (dpkp PR 337)
+* Save failed request as FailedPayloadsError attribute (jobevers PR 302)
+* Remove unused kafka.queue (mumrah)
+
# 0.9.3 (Feb 3, 2015)
* Add coveralls.io support (sontek PR 307)
diff --git a/README.rst b/README.rst
index 5405f92..e957ee3 100644
--- a/README.rst
+++ b/README.rst
@@ -32,11 +32,11 @@ Copyright 2015, David Arthur under Apache License, v2.0. See `LICENSE`
Status
----------
-The current stable version of this package is `0.9.3`_ and is compatible with:
+The current stable version of this package is `0.9.4`_ and is compatible with:
Kafka broker versions
-- 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs]
+- 0.8.2.1 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs]
- 0.8.1.1
- 0.8.1
- 0.8.0
@@ -47,7 +47,7 @@ Python versions
- 2.7 (tested on 2.7.9)
- 3.3 (tested on 3.3.5)
- 3.4 (tested on 3.4.2)
-- pypy (tested on pypy 2.4.0 / python 2.7.8)
+- pypy (tested on pypy 2.5.0 / python 2.7.8)
.. _Full documentation available on ReadTheDocs: http://kafka-python.readthedocs.org/en/latest/
-.. _0.9.3: https://github.com/mumrah/kafka-python/releases/tag/v0.9.3
+.. _0.9.4: https://github.com/mumrah/kafka-python/releases/tag/v0.9.4
diff --git a/build_integration.sh b/build_integration.sh
index 2b81745..5395bb8 100755
--- a/build_integration.sh
+++ b/build_integration.sh
@@ -1,7 +1,7 @@
#!/bin/bash
# Versions available for testing via binary distributions
-OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1 0.8.2.0"
+OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1 0.8.2.1"
# Useful configuration vars, with sensible defaults
if [ -z "$SCALA_VERSION" ]; then
diff --git a/docs/conf.py b/docs/conf.py
index ea223c2..2979560 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -56,11 +56,10 @@ copyright = u'2015, David Arthur'
# built documents.
#
# The short X.Y version.
-with open('../VERSION') as version_file:
- version = version_file.read()
+exec(open('../kafka/version.py').read())
# The full version, including alpha/beta/rc tags.
-release = version
+release = __version__
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/docs/index.rst b/docs/index.rst
index e4a9ac7..c499d4c 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -16,11 +16,11 @@ see https://groups.google.com/forum/m/#!forum/kafka-clients
Status
------
-The current stable version of this package is `0.9.3 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.3>`_ and is compatible with:
+The current stable version of this package is `0.9.4 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.4>`_ and is compatible with:
Kafka broker versions
-* 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs]
+* 0.8.2.1 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs]
* 0.8.1.1
* 0.8.1
* 0.8.0
@@ -31,7 +31,7 @@ Python versions
* 2.7 (tested on 2.7.9)
* 3.3 (tested on 3.3.5)
* 3.4 (tested on 3.4.2)
-* pypy (tested on pypy 2.4.0 / python 2.7.8)
+* pypy (tested on pypy 2.5.0 / python 2.7.8)
License
-------
diff --git a/docs/usage.rst b/docs/usage.rst
index ca326d4..6417cd8 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -9,21 +9,24 @@ SimpleProducer
from kafka import SimpleProducer, KafkaClient
# To send messages synchronously
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
- # Note that the application is responsible for encoding messages to type str
- producer.send_messages("my-topic", "some message")
- producer.send_messages("my-topic", "this method", "is variadic")
+ # Note that the application is responsible for encoding messages to type bytes
+ producer.send_messages(b'my-topic', b'some message')
+ producer.send_messages(b'my-topic', b'this method', b'is variadic')
# Send unicode message
- producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
+ producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8'))
+
+Asynchronous Mode
+-----------------
+
+.. code:: python
# To send messages asynchronously
- # WARNING: current implementation does not guarantee message delivery on failure!
- # messages can get dropped! Use at your own risk! Or help us improve with a PR!
producer = SimpleProducer(kafka, async=True)
- producer.send_messages("my-topic", "async message")
+ producer.send_messages(b'my-topic', b'async message')
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
@@ -32,13 +35,12 @@ SimpleProducer
# by all in sync replicas before sending a response
producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000)
-
- response = producer.send_messages("my-topic", "another message")
+ ack_timeout=2000,
+ sync_fail_on_error=False)
- if response:
- print(response[0].error)
- print(response[0].offset)
+ responses = producer.send_messages(b'my-topic', b'another message')
+ for r in responses:
+ logging.info(r.offset)
# To send messages in batch. You can use any of the available
# producers for doing this. The following producer will collect
@@ -56,16 +58,21 @@ Keyed messages
.. code:: python
- from kafka import (KafkaClient, KeyedProducer, HashedPartitioner,
- RoundRobinPartitioner)
+ from kafka import (
+ KafkaClient, KeyedProducer,
+ Murmur2Partitioner, RoundRobinPartitioner)
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
- # HashedPartitioner is default
+ # HashedPartitioner is default (currently uses python hash())
producer = KeyedProducer(kafka)
- producer.send_messages("my-topic", "key1", "some message")
- producer.send_messages("my-topic", "key2", "this methode")
+ producer.send_messages(b'my-topic', b'key1', b'some message')
+ producer.send_messages(b'my-topic', b'key2', b'this methode')
+ # Murmur2Partitioner attempts to mirror the java client hashing
+ producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner)
+
+ # Or just produce round-robin (or just use SimpleProducer)
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
@@ -78,9 +85,9 @@ KafkaConsumer
from kafka import KafkaConsumer
# To consume messages
- consumer = KafkaConsumer("my-topic",
- group_id="my_group",
- bootstrap_servers=["localhost:9092"])
+ consumer = KafkaConsumer('my-topic',
+ group_id='my_group',
+ bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value is raw byte string -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
@@ -88,8 +95,6 @@ KafkaConsumer
message.offset, message.key,
message.value))
- kafka.close()
-
messages (m) are namedtuples with attributes:
@@ -121,16 +126,16 @@ messages (m) are namedtuples with attributes:
# so it can be included in the next commit
#
# **messages that are not marked w/ task_done currently do not commit!
- kafka.task_done(m)
+ consumer.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
- kafka.commit()
+ consumer.commit()
# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
- kafka.task_done(m)
+ consumer.task_done(m)
Configuration settings can be passed to constructor,
@@ -162,13 +167,13 @@ Multiprocess consumer
from kafka import KafkaClient, MultiProcessConsumer
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
# This will split the number of partitions among two processes
- consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
+ consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2)
# This will spawn processes such that each handles 2 partitions max
- consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
+ consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic',
partitions_per_proc=2)
for message in consumer:
@@ -186,14 +191,14 @@ Low level
from kafka.protocol import KafkaProtocol
from kafka.common import ProduceRequest
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
- req = ProduceRequest(topic="my-topic", partition=1,
- messages=[create_message("some message")])
+ req = ProduceRequest(topic=b'my-topic', partition=1,
+ messages=[create_message(b'some message')])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
- resps[0].topic # "my-topic"
+ resps[0].topic # b'my-topic'
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request
diff --git a/kafka/client.py b/kafka/client.py
index da86175..13777a4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -138,7 +138,8 @@ class KafkaClient(object):
Arguments:
payloads: list of object-like entities with a topic (str) and
- partition (int) attribute
+ partition (int) attribute; payloads with duplicate topic-partitions
+ are not supported.
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
@@ -152,20 +153,29 @@ class KafkaClient(object):
List of response objects in the same order as the supplied payloads
"""
+ # encoders / decoders do not maintain ordering currently
+ # so we need to keep this so we can rebuild order before returning
+ original_ordering = [(p.topic, p.partition) for p in payloads]
+
# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
+ responses = {}
for payload in payloads:
- leader = self._get_leader_for_partition(payload.topic,
- payload.partition)
-
- payloads_by_broker[leader].append(payload)
- brokers_for_payloads.append(leader)
+ try:
+ leader = self._get_leader_for_partition(payload.topic,
+ payload.partition)
+ payloads_by_broker[leader].append(payload)
+ brokers_for_payloads.append(leader)
+ except KafkaUnavailableError as e:
+ log.warning('KafkaUnavailableError attempting to send request '
+ 'on topic %s partition %d', payload.topic, payload.partition)
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
# For each broker, send the list of request payloads
# and collect the responses and errors
- responses_by_broker = collections.defaultdict(list)
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
@@ -184,7 +194,8 @@ class KafkaClient(object):
'to server %s: %s', requestId, broker, e)
for payload in payloads:
- responses_by_broker[broker].append(FailedPayloadsError(payload))
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
# No exception, try to get response
else:
@@ -196,7 +207,8 @@ class KafkaClient(object):
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
- responses_by_broker[broker].append(None)
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = None
continue
try:
@@ -208,12 +220,17 @@ class KafkaClient(object):
requestId, broker, e)
for payload in payloads:
- responses_by_broker[broker].append(FailedPayloadsError(payload))
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
else:
+ _resps = []
for payload_response in decoder_fn(response):
- responses_by_broker[broker].append(payload_response)
- log.debug('Response %s: %s', requestId, responses_by_broker[broker])
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
@@ -223,9 +240,7 @@ class KafkaClient(object):
self.reset_all_metadata()
# Return responses in the same order as provided
- responses_by_payload = [responses_by_broker[broker].pop(0)
- for broker in brokers_for_payloads]
- return responses_by_payload
+ return [responses[tp] for tp in original_ordering]
def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
@@ -296,6 +311,10 @@ class KafkaClient(object):
return sorted(list(self.topic_partitions[topic]))
+ @property
+ def topics(self):
+ return list(self.topic_partitions.keys())
+
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
@@ -349,8 +368,8 @@ class KafkaClient(object):
resp = self.send_metadata_request(topics)
- log.info('Updating broker metadata: %s', resp.brokers)
- log.info('Updating topic metadata: %s', resp.topics)
+ log.debug('Updating broker metadata: %s', resp.brokers)
+ log.debug('Updating topic metadata: %s', resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
@@ -395,7 +414,7 @@ class KafkaClient(object):
# this error code is provided for admin purposes only
# we never talk to replicas, only the leader
except ReplicaNotAvailableError:
- log.warning('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
+ log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers:
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 6365cfa..0800327 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -8,7 +8,7 @@ from threading import Lock
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
- UnknownTopicOrPartitionError, check_error
+ UnknownTopicOrPartitionError, check_error, KafkaError
)
from kafka.util import kafka_bytestring, ReentrantTimer
@@ -114,12 +114,13 @@ class Consumer(object):
self.offsets[resp.partition] = resp.offset
def commit(self, partitions=None):
- """
- Commit offsets for this consumer
+ """Commit stored offsets to Kafka via OffsetCommitRequest (v0)
Keyword Arguments:
partitions (list): list of partitions to commit, default is to commit
all of them
+
+ Returns: True on success, False on failure
"""
# short circuit if nothing happened. This check is kept outside
@@ -135,22 +136,27 @@ class Consumer(object):
reqs = []
if partitions is None: # commit all partitions
- partitions = self.offsets.keys()
+ partitions = list(self.offsets.keys())
+ log.debug('Committing new offsets for %s, partitions %s',
+ self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: "
- "group=%s, topic=%s, partition=%s" %
- (offset, self.group, self.topic, partition))
+ log.debug('Commit offset %d in SimpleConsumer: '
+ 'group=%s, topic=%s, partition=%s',
+ offset, self.group, self.topic, partition)
reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))
- resps = self.client.send_offset_commit_request(self.group, reqs)
- for resp in resps:
- kafka.common.check_error(resp)
-
- self.count_since_commit = 0
+ try:
+ self.client.send_offset_commit_request(self.group, reqs)
+ except KafkaError as e:
+ log.error('%s saving offsets: %s', e.__class__.__name__, e)
+ return False
+ else:
+ self.count_since_commit = 0
+ return True
def _auto_commit(self):
"""
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index e4233ff..733baa8 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -27,7 +27,7 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, OffsetRequest,
+ FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -126,8 +126,8 @@ class SimpleConsumer(Consumer):
auto_commit_every_t=auto_commit_every_t)
if max_buffer_size is not None and buffer_size > max_buffer_size:
- raise ValueError("buffer_size (%d) is greater than "
- "max_buffer_size (%d)" %
+ raise ValueError('buffer_size (%d) is greater than '
+ 'max_buffer_size (%d)' %
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.max_buffer_size = max_buffer_size
@@ -144,6 +144,13 @@ class SimpleConsumer(Consumer):
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
+ """Update offsets using auto_offset_reset policy (smallest|largest)
+
+ Arguments:
+ partition (int): the partition for which offsets should be updated
+
+ Returns: Updated offset on success, None on failure
+ """
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
@@ -163,10 +170,17 @@ class SimpleConsumer(Consumer):
raise
# send_offset_request
- (resp, ) = self.client.send_offset_request(reqs)
- check_error(resp)
- self.offsets[partition] = resp.offsets[0]
- self.fetch_offsets[partition] = resp.offsets[0]
+ log.info('Resetting topic-partition offset to %s for %s:%d',
+ self.auto_offset_reset, self.topic, partition)
+ try:
+ (resp, ) = self.client.send_offset_request(reqs)
+ except KafkaError as e:
+ log.error('%s sending offset request for %s:%d',
+ e.__class__.__name__, self.topic, partition)
+ else:
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+ return resp.offsets[0]
def provide_partition_info(self):
"""
@@ -174,33 +188,62 @@ class SimpleConsumer(Consumer):
"""
self.partition_info = True
- def seek(self, offset, whence):
+ def seek(self, offset, whence=None, partition=None):
"""
Alter the current offset in the consumer, similar to fseek
Arguments:
offset: how much to modify the offset
- whence: where to modify it from
+ whence: where to modify it from, default is None
- * 0 is relative to the earliest available offset (head)
- * 1 is relative to the current offset
- * 2 is relative to the latest known offset (tail)
+ * None is an absolute offset
+ * 0 is relative to the earliest available offset (head)
+ * 1 is relative to the current offset
+ * 2 is relative to the latest known offset (tail)
+
+ partition: modify which partition, default is None.
+ If partition is None, would modify all partitions.
"""
- if whence == 1: # relative to current position
- for partition, _offset in self.offsets.items():
- self.offsets[partition] = _offset + offset
+ if whence is None: # set an absolute offset
+ if partition is None:
+ for tmp_partition in self.offsets:
+ self.offsets[tmp_partition] = offset
+ else:
+ self.offsets[partition] = offset
+ elif whence == 1: # relative to current position
+ if partition is None:
+ for tmp_partition, _offset in self.offsets.items():
+ self.offsets[tmp_partition] = _offset + offset
+ else:
+ self.offsets[partition] += offset
elif whence in (0, 2): # relative to beginning or end
- # divide the request offset by number of partitions,
- # distribute the remained evenly
- (delta, rem) = divmod(offset, len(self.offsets))
- deltas = {}
- for partition, r in izip_longest(self.offsets.keys(),
- repeat(1, rem), fillvalue=0):
- deltas[partition] = delta + r
-
reqs = []
- for partition in self.offsets.keys():
+ deltas = {}
+ if partition is None:
+ # divide the request offset by number of partitions,
+ # distribute the remained evenly
+ (delta, rem) = divmod(offset, len(self.offsets))
+ for tmp_partition, r in izip_longest(self.offsets.keys(),
+ repeat(1, rem),
+ fillvalue=0):
+ deltas[tmp_partition] = delta + r
+
+ for tmp_partition in self.offsets.keys():
+ if whence == 0:
+ reqs.append(OffsetRequest(self.topic,
+ tmp_partition,
+ -2,
+ 1))
+ elif whence == 2:
+ reqs.append(OffsetRequest(self.topic,
+ tmp_partition,
+ -1,
+ 1))
+ else:
+ pass
+ else:
+ deltas[partition] = offset
if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
@@ -213,7 +256,7 @@ class SimpleConsumer(Consumer):
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
- raise ValueError("Unexpected value for `whence`, %d" % whence)
+ raise ValueError('Unexpected value for `whence`, %d' % whence)
# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
@@ -236,35 +279,32 @@ class SimpleConsumer(Consumer):
"""
messages = []
if timeout is not None:
- max_time = time.time() + timeout
+ timeout += time.time()
new_offsets = {}
- while count > 0 and (timeout is None or timeout > 0):
- result = self._get_message(block, timeout, get_partition_info=True,
+ log.debug('getting %d messages', count)
+ while len(messages) < count:
+ block_time = timeout - time.time()
+ log.debug('calling _get_message block=%s timeout=%s', block, block_time)
+ result = self._get_message(block, block_time,
+ get_partition_info=True,
update_offset=False)
- if result:
- partition, message = result
- if self.partition_info:
- messages.append(result)
- else:
- messages.append(message)
- new_offsets[partition] = message.offset + 1
- count -= 1
- else:
- # Ran out of messages for the last request.
- if not block:
- # If we're not blocking, break.
- break
+ log.debug('got %s from _get_messages', result)
+ if not result:
+ if block and (timeout is None or time.time() <= timeout):
+ continue
+ break
- # If we have a timeout, reduce it to the
- # appropriate value
- if timeout is not None:
- timeout = max_time - time.time()
+ partition, message = result
+ _msg = (partition, message) if self.partition_info else message
+ messages.append(_msg)
+ new_offsets[partition] = message.offset + 1
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
+ log.debug('got %d messages: %s', len(messages), messages)
return messages
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
@@ -278,10 +318,16 @@ class SimpleConsumer(Consumer):
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
- if self.queue.empty():
+ start_at = time.time()
+ while self.queue.empty():
# We're out of messages, go grab some more.
+ log.debug('internal queue empty, fetching more messages')
with FetchContext(self, block, timeout):
self._fetch()
+
+ if not block or time.time() > (start_at + timeout):
+ break
+
try:
partition, message = self.queue.get_nowait()
@@ -300,6 +346,7 @@ class SimpleConsumer(Consumer):
else:
return message
except Empty:
+ log.debug('internal queue empty after fetch - returning None')
return None
def __iter__(self):
@@ -344,23 +391,26 @@ class SimpleConsumer(Consumer):
try:
check_error(resp)
except UnknownTopicOrPartitionError:
+ log.error('UnknownTopicOrPartitionError for %s:%d',
+ resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
raise
except NotLeaderForPartitionError:
+ log.error('NotLeaderForPartitionError for %s:%d',
+ resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
continue
except OffsetOutOfRangeError:
- log.warning("OffsetOutOfRangeError for %s - %d. "
- "Resetting partition offset...",
+ log.warning('OffsetOutOfRangeError for %s:%d. '
+ 'Resetting partition offset...',
resp.topic, resp.partition)
self.reset_partition_offset(resp.partition)
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
except FailedPayloadsError as e:
- log.warning("Failed payloads of %s"
- "Resetting partition offset...",
- e.payload)
+ log.warning('FailedPayloadsError for %s:%d',
+ e.payload.topic, e.payload.partition)
# Retry this partition
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
continue
@@ -379,7 +429,7 @@ class SimpleConsumer(Consumer):
except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
buffer_size == self.max_buffer_size):
- log.error("Max fetch size %d too small",
+ log.error('Max fetch size %d too small',
self.max_buffer_size)
raise
if self.max_buffer_size is None:
@@ -387,12 +437,12 @@ class SimpleConsumer(Consumer):
else:
buffer_size = min(buffer_size * 2,
self.max_buffer_size)
- log.warn("Fetch size too small, increase to %d (2x) "
- "and retry", buffer_size)
+ log.warning('Fetch size too small, increase to %d (2x) '
+ 'and retry', buffer_size)
retry_partitions[partition] = buffer_size
except ConsumerNoMoreData as e:
- log.debug("Iteration was ended by %r", e)
+ log.debug('Iteration was ended by %r', e)
except StopIteration:
# Stop iterating through this partition
- log.debug("Done iterating over partition %s" % partition)
+ log.debug('Done iterating over partition %s', partition)
partitions = retry_partitions
diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py
index fdb19bb..5b6ac2d 100644
--- a/kafka/partitioner/__init__.py
+++ b/kafka/partitioner/__init__.py
@@ -1,6 +1,7 @@
from .roundrobin import RoundRobinPartitioner
-from .hashed import HashedPartitioner
+from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
__all__ = [
- 'RoundRobinPartitioner', 'HashedPartitioner'
+ 'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner',
+ 'LegacyPartitioner'
]
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index fb5e598..6393ce2 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -1,8 +1,26 @@
from .base import Partitioner
-class HashedPartitioner(Partitioner):
+
+class Murmur2Partitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
+ the hash of the key. Attempts to apply the same hashing
+ function as mainline java client.
+ """
+ def partition(self, key, partitions=None):
+ if not partitions:
+ partitions = self.partitions
+
+ # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
+ idx = (murmur2(key) & 0x7fffffff) % len(partitions)
+
+ return partitions[idx]
+
+
+class LegacyPartitioner(Partitioner):
+ """DEPRECATED -- See Issue 374
+
+ Implements a partitioner which selects the target partition based on
the hash of the key
"""
def partition(self, key, partitions=None):
@@ -12,3 +30,79 @@ class HashedPartitioner(Partitioner):
idx = hash(key) % size
return partitions[idx]
+
+
+# Default will change to Murmur2 in 0.10 release
+HashedPartitioner = LegacyPartitioner
+
+
+# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
+def murmur2(key):
+ """Pure-python Murmur2 implementation.
+
+ Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
+
+ Args:
+ key: if not a bytearray, converted via bytearray(str(key))
+
+ Returns: MurmurHash2 of key bytearray
+ """
+
+ # Convert key to a bytearray
+ if not isinstance(key, bytearray):
+ data = bytearray(str(key))
+
+ length = len(data)
+ seed = 0x9747b28c
+ # 'm' and 'r' are mixing constants generated offline.
+ # They're not really 'magic', they just happen to work well.
+ m = 0x5bd1e995
+ r = 24
+
+ # Initialize the hash to a random value
+ h = seed ^ length
+ length4 = length / 4
+
+ for i in range(length4):
+ i4 = i * 4
+ k = ((data[i4 + 0] & 0xff) +
+ ((data[i4 + 1] & 0xff) << 8) +
+ ((data[i4 + 2] & 0xff) << 16) +
+ ((data[i4 + 3] & 0xff) << 24))
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+ k ^= (k % 0x100000000) >> r # k ^= k >>> r
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+
+ h *= m
+ h &= 0xffffffff
+ h ^= k
+ h &= 0xffffffff
+
+ # Handle the last few bytes of the input array
+ extra_bytes = length % 4
+ if extra_bytes == 3:
+ h ^= (data[(length & ~3) + 2] & 0xff) << 16
+ h &= 0xffffffff
+
+ if extra_bytes == 2:
+ h ^= (data[(length & ~3) + 1] & 0xff) << 8
+ h &= 0xffffffff
+
+ if extra_bytes == 1:
+ h ^= (data[length & ~3] & 0xff)
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+
+ h ^= (h % 0x100000000) >> 13 # h >>> 13;
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+ h ^= (h % 0x100000000) >> 15 # h >>> 15;
+ h &= 0xffffffff
+
+ return h
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index bee1888..d5c013a 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -166,8 +166,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_cls:
_handle_error(error_cls, orig_req)
- log.error('Error sending ProduceRequest (#%d of %d) to %s:%d '
- 'with msgs %s', i + 1, len(requests),
+ log.error('%s sending ProduceRequest (#%d of %d) '
+ 'to %s:%d with msgs %s',
+ error_cls.__name__, (i + 1), len(requests),
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
diff --git a/kafka/version.py b/kafka/version.py
index 5b721ed..cd64b48 100644
--- a/kafka/version.py
+++ b/kafka/version.py
@@ -1 +1 @@
-__version__ = '0.9.4-dev'
+__version__ = '0.9.5-dev'
diff --git a/servers/0.8.0/resources/kafka.properties b/servers/0.8.0/resources/kafka.properties
index c9fd552..b9f5c49 100644
--- a/servers/0.8.0/resources/kafka.properties
+++ b/servers/0.8.0/resources/kafka.properties
@@ -35,6 +35,10 @@ log.dirs={tmp_dir}/data
num.partitions={partitions}
default.replication.factor={replicas}
+## Short Replica Lag -- Drops failed brokers out of ISR
+replica.lag.time.max.ms=1000
+replica.socket.timeout.ms=1000
+
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
@@ -49,7 +53,11 @@ log.cleanup.interval.mins=1
############################# Zookeeper #############################
zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
+# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
+zookeeper.session.timeout.ms=500
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
diff --git a/servers/0.8.1.1/resources/kafka.properties b/servers/0.8.1.1/resources/kafka.properties
index a638f39..685aed1 100644
--- a/servers/0.8.1.1/resources/kafka.properties
+++ b/servers/0.8.1.1/resources/kafka.properties
@@ -63,6 +63,10 @@ log.dirs={tmp_dir}/data
num.partitions={partitions}
default.replication.factor={replicas}
+## Short Replica Lag -- Drops failed brokers out of ISR
+replica.lag.time.max.ms=1000
+replica.socket.timeout.ms=1000
+
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
@@ -116,3 +120,5 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
+# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
+zookeeper.session.timeout.ms=500
diff --git a/servers/0.8.1/resources/kafka.properties b/servers/0.8.1/resources/kafka.properties
index 5d47520..76b0cb4 100644
--- a/servers/0.8.1/resources/kafka.properties
+++ b/servers/0.8.1/resources/kafka.properties
@@ -35,6 +35,10 @@ log.dirs={tmp_dir}/data
num.partitions={partitions}
default.replication.factor={replicas}
+## Short Replica Lag -- Drops failed brokers out of ISR
+replica.lag.time.max.ms=1000
+replica.socket.timeout.ms=1000
+
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
@@ -56,4 +60,8 @@ log.cleaner.enable=false
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
+# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
+zookeeper.session.timeout.ms=500
diff --git a/servers/0.8.2.0/resources/kafka.properties b/servers/0.8.2.0/resources/kafka.properties
index a638f39..685aed1 100644
--- a/servers/0.8.2.0/resources/kafka.properties
+++ b/servers/0.8.2.0/resources/kafka.properties
@@ -63,6 +63,10 @@ log.dirs={tmp_dir}/data
num.partitions={partitions}
default.replication.factor={replicas}
+## Short Replica Lag -- Drops failed brokers out of ISR
+replica.lag.time.max.ms=1000
+replica.socket.timeout.ms=1000
+
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
@@ -116,3 +120,5 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
+# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
+zookeeper.session.timeout.ms=500
diff --git a/servers/0.8.2.1/resources/kafka.properties b/servers/0.8.2.1/resources/kafka.properties
new file mode 100644
index 0000000..685aed1
--- /dev/null
+++ b/servers/0.8.2.1/resources/kafka.properties
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+## Short Replica Lag -- Drops failed brokers out of ISR
+replica.lag.time.max.ms=1000
+replica.socket.timeout.ms=1000
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
+zookeeper.session.timeout.ms=500
diff --git a/servers/0.8.2.1/resources/log4j.properties b/servers/0.8.2.1/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/servers/0.8.2.1/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout
diff --git a/servers/0.8.2.1/resources/zookeeper.properties b/servers/0.8.2.1/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/servers/0.8.2.1/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
diff --git a/test/fixtures.py b/test/fixtures.py
index d4d03ee..164d0d7 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -4,6 +4,7 @@ import os.path
import shutil
import subprocess
import tempfile
+import time
from six.moves import urllib
import uuid
@@ -125,12 +126,18 @@ class ZookeeperFixture(Fixture):
# Party!
self.out("Starting...")
+ timeout = 5
+ max_timeout = 30
+ backoff = 1
while True:
self.child = SpawnedService(args, env)
self.child.start()
- if self.child.wait_for(r"binding to port", timeout=5):
+ timeout = min(timeout, max_timeout)
+ if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.stop()
+ timeout *= 2
+ time.sleep(backoff)
self.out("Done!")
def close(self):
@@ -225,12 +232,19 @@ class KafkaFixture(Fixture):
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
+ timeout = 5
+ max_timeout = 30
+ backoff = 1
while True:
self.child = SpawnedService(args, env)
self.child.start()
- if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5):
+ timeout = min(timeout, max_timeout)
+ if self.child.wait_for(r"\[Kafka Server %d\], Started" %
+ self.broker_id, timeout=timeout):
break
self.child.stop()
+ timeout *= 2
+ time.sleep(backoff)
self.out("Done!")
self.running = True
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 585123b..8853350 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -2,8 +2,9 @@ import os
from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
- KafkaTimeoutError
+ KafkaTimeoutError, ProduceRequest
)
+from kafka.protocol import create_message
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
@@ -49,11 +50,40 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+ @kafka_versions('all')
+ def test_send_produce_request_maintains_request_response_order(self):
+
+ self.client.ensure_topic_exists(b'foo')
+ self.client.ensure_topic_exists(b'bar')
+
+ requests = [
+ ProduceRequest(
+ b'foo', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'foo', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ]
+
+ responses = self.client.send_produce_request(requests)
+ while len(responses):
+ request = requests.pop()
+ response = responses.pop()
+ self.assertEqual(request.topic, response.topic)
+ self.assertEqual(request.partition, response.partition)
+
+
####################
# Offset Tests #
####################
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 08fd620..df15115 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -4,7 +4,7 @@ from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
- KafkaConfigurationError, FetchResponse,
+ KafkaConfigurationError, FetchResponse, OffsetFetchResponse,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
@@ -25,10 +25,11 @@ class TestMultiProcessConsumer(unittest.TestCase):
client = MagicMock()
partitions = (0,)
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
- consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
+ MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
+class TestSimpleConsumer(unittest.TestCase):
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
@@ -80,6 +81,45 @@ class TestMultiProcessConsumer(unittest.TestCase):
with self.assertRaises(UnknownTopicOrPartitionError):
consumer.get_messages(20)
+ def test_simple_consumer_commit_does_not_raise(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+
+ def mock_offset_fetch_request(group, payloads, **kwargs):
+ return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads]
+
+ client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
+
+ def mock_offset_commit_request(group, payloads, **kwargs):
+ raise FailedPayloadsError(payloads[0])
+
+ client.send_offset_commit_request.side_effect = mock_offset_commit_request
+
+ consumer = SimpleConsumer(client, group='foobar',
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # Mock internal commit check
+ consumer.count_since_commit = 10
+
+ # This should not raise an exception
+ self.assertFalse(consumer.commit(partitions=[0, 1]))
+
+ def test_simple_consumer_reset_partition_offset(self):
+ client = MagicMock()
+
+ def mock_offset_request(payloads, **kwargs):
+ raise FailedPayloadsError(payloads[0])
+
+ client.send_offset_request.side_effect = mock_offset_request
+
+ consumer = SimpleConsumer(client, group='foobar',
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # This should not raise an exception
+ self.assertEqual(consumer.reset_partition_offset(0), None)
+
@staticmethod
def fail_requests_factory(error_factory):
# Mock so that only the first request gets a valid response
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 8911e3e..52b3e85 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -132,7 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -164,6 +164,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.seek(-13, 2)
self.assert_message_count([ message for message in consumer ], 13)
+ # Set absolute offset
+ consumer.seek(100)
+ self.assert_message_count([ message for message in consumer ], 0)
+ consumer.seek(100, partition=0)
+ self.assert_message_count([ message for message in consumer ], 0)
+ consumer.seek(101, partition=1)
+ self.assert_message_count([ message for message in consumer ], 0)
+ consumer.seek(90, partition=0)
+ self.assert_message_count([ message for message in consumer ], 10)
+ consumer.seek(20, partition=1)
+ self.assert_message_count([ message for message in consumer ], 80)
+ consumer.seek(0, partition=1)
+ self.assert_message_count([ message for message in consumer ], 100)
+
consumer.stop()
@kafka_versions("all")
@@ -276,7 +290,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -342,7 +356,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
big_consumer.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -369,7 +383,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -477,7 +491,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10).encode('utf-8')
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 91e22cf..91779d7 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -2,8 +2,6 @@ import logging
import os
import time
-from . import unittest
-
from kafka import KafkaClient, SimpleConsumer, KeyedProducer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer
@@ -15,6 +13,9 @@ from test.testutil import (
)
+log = logging.getLogger(__name__)
+
+
class TestFailover(KafkaIntegrationTestCase):
create_client = False
@@ -23,10 +24,10 @@ class TestFailover(KafkaIntegrationTestCase):
return
zk_chroot = random_string(10)
- replicas = 2
- partitions = 2
+ replicas = 3
+ partitions = 3
- # mini zookeeper, 2 kafka brokers
+ # mini zookeeper, 3 kafka brokers
self.zk = ZookeeperFixture.instance()
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
@@ -73,12 +74,12 @@ class TestFailover(KafkaIntegrationTestCase):
timeout = 60
while not recovered and (time.time() - started) < timeout:
try:
- logging.debug("attempting to send 'success' message after leader killed")
+ log.debug("attempting to send 'success' message after leader killed")
producer.send_messages(topic, partition, b'success')
- logging.debug("success!")
+ log.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError):
- logging.debug("caught exception sending message -- will retry")
+ log.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
@@ -89,7 +90,9 @@ class TestFailover(KafkaIntegrationTestCase):
# count number of messages
# Should be equal to 100 before + 1 recovery + 100 after
- self.assert_message_count(topic, 201, partitions=(partition,))
+ # at_least=True because exactly once delivery isn't really a thing
+ self.assert_message_count(topic, 201, partitions=(partition,),
+ at_least=True)
@kafka_versions("all")
def test_switch_leader_async(self):
@@ -110,7 +113,7 @@ class TestFailover(KafkaIntegrationTestCase):
# kill leader for partition
self._kill_leader(topic, partition)
- logging.debug("attempting to send 'success' message after leader killed")
+ log.debug("attempting to send 'success' message after leader killed")
# in async mode, this should return immediately
producer.send_messages(topic, partition, b'success')
@@ -133,6 +136,7 @@ class TestFailover(KafkaIntegrationTestCase):
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after
+ # at_least=True because exactly once delivery isn't really a thing
self.assert_message_count(topic, 21, partitions=(partition,),
at_least=True)
self.assert_message_count(topic, 21, partitions=(partition + 1,),
@@ -164,7 +168,7 @@ class TestFailover(KafkaIntegrationTestCase):
if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError):
- logging.debug("caught exception sending message -- will retry")
+ log.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
@@ -187,12 +191,16 @@ class TestFailover(KafkaIntegrationTestCase):
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
- logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
msg = 'msg {0}: {1}'.format(j, random_string(10))
- resp = producer.send_messages(topic, partition, msg.encode('utf-8'))
- if len(resp) > 0:
- self.assertEqual(resp[0].error, 0)
- logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
+ log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
+ while True:
+ try:
+ producer.send_messages(topic, partition, msg.encode('utf-8'))
+ except:
+ log.exception('failure in _send_random_messages - retrying')
+ continue
+ else:
+ break
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)]