diff options
27 files changed, 718 insertions, 168 deletions
diff --git a/.travis.yml b/.travis.yml index 7184bc8..136c19f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ env: - KAFKA_VERSION=0.8.0 - KAFKA_VERSION=0.8.1 - KAFKA_VERSION=0.8.1.1 - - KAFKA_VERSION=0.8.2.0 + - KAFKA_VERSION=0.8.2.1 before_install: - sudo apt-get install libsnappy-dev @@ -1,16 +1,19 @@ # Contributors -Top 10 contributors, listed by contribution. See https://github.com/mumrah/kafka-python/graphs/contributors for the full list +Top contributors, listed by contribution. See https://github.com/mumrah/kafka-python/graphs/contributors for the full list * David Arthur, [@mumrah](https://github.com/mumrah) * Dana Powers, [@dpkp](https://github.com/dpkp) * Mahendra M, [@mahendra](https://github.com/mahendra) * Mark Roberts, [@wizzat](https://github.com/wizzat) * Omar, [@rdiomar](https://github.com/rdiomar) - RIP, Omar. 2014 +* Viktor Shlapakov, [@vshlapakov](https://github.com/vshlapakov) * Bruno Renié, [@brutasse](https://github.com/brutasse) * Marc Labbé, [@mrtheb](https://github.com/mrtheb) +* John Anderson, [@sontek](https://github.com/sontek) * Ivan Pouzyrevsky, [@sandello](https://github.com/sandello) * Thomas Dimson, [@cosbynator](https://github.com/cosbynator) -* Zack Dever, [@zever](https://github.com/zever) +* Alex Couture-Beil, [@alexcb](https://github.com/alexcb) +* Zack Dever, [@zackdever](https://github.com/zackdever) Thanks to all who have contributed! @@ -1,3 +1,72 @@ +# 0.9.4 (June 11, 2015) + +Consumers +* Refactor SimpleConsumer internal fetch handling (dpkp PR 399) +* Handle exceptions in SimpleConsumer commit() and reset_partition_offset() (dpkp PR 404) +* Improve FailedPayloadsError handling in KafkaConsumer (dpkp PR 398) +* KafkaConsumer: avoid raising KeyError in task_done (dpkp PR 389) +* MultiProcessConsumer -- support configured partitions list (dpkp PR 380) +* Fix SimpleConsumer leadership change handling (dpkp PR 393) +* Fix SimpleConsumer connection error handling (reAsOn2010 PR 392) +* Improve Consumer handling of 'falsy' partition values (wting PR 342) +* Fix _offsets call error in KafkaConsumer (hellais PR 376) +* Fix str/bytes bug in KafkaConsumer (dpkp PR 365) +* Register atexit handlers for consumer and producer thread/multiprocess cleanup (dpkp PR 360) +* Always fetch commit offsets in base consumer unless group is None (dpkp PR 356) +* Stop consumer threads on delete (dpkp PR 357) +* Deprecate metadata_broker_list in favor of bootstrap_servers in KafkaConsumer (dpkp PR 340) +* Support pass-through parameters in multiprocess consumer (scrapinghub PR 336) +* Enable offset commit on SimpleConsumer.seek (ecanzonieri PR 350) +* Improve multiprocess consumer partition distribution (scrapinghub PR 335) +* Ignore messages with offset less than requested (wkiser PR 328) +* Handle OffsetOutOfRange in SimpleConsumer (ecanzonieri PR 296) + +Producers +* Add Murmur2Partitioner (dpkp PR 378) +* Log error types in SimpleProducer and SimpleConsumer (dpkp PR 405) +* SimpleProducer support configuration of fail_on_error (dpkp PR 396) +* Deprecate KeyedProducer.send() (dpkp PR 379) +* Further improvements to async producer code (dpkp PR 388) +* Add more configuration parameters for async producer (dpkp) +* Deprecate SimpleProducer batch_send=True in favor of async (dpkp) +* Improve async producer error handling and retry logic (vshlapakov PR 331) +* Support message keys in async producer (vshlapakov PR 329) +* Use threading instead of multiprocessing for Async Producer (vshlapakov PR 330) +* Stop threads on __del__ (chmduquesne PR 324) +* Fix leadership failover handling in KeyedProducer (dpkp PR 314) + +KafkaClient +* Add .topics property for list of known topics (dpkp) +* Fix request / response order guarantee bug in KafkaClient (dpkp PR 403) +* Improve KafkaClient handling of connection failures in _get_conn (dpkp) +* Client clears local metadata cache before updating from server (dpkp PR 367) +* KafkaClient should return a response or error for each request - enable better retry handling (dpkp PR 366) +* Improve str/bytes conversion in KafkaClient and KafkaConsumer (dpkp PR 332) +* Always return sorted partition ids in client.get_partition_ids_for_topic() (dpkp PR 315) + +Documentation +* Cleanup Usage Documentation +* Improve KafkaConsumer documentation (dpkp PR 341) +* Update consumer documentation (sontek PR 317) +* Add doc configuration for tox (sontek PR 316) +* Switch to .rst doc format (sontek PR 321) +* Fixup google groups link in README (sontek PR 320) +* Automate documentation at kafka-python.readthedocs.org + +Internals +* Switch integration testing from 0.8.2.0 to 0.8.2.1 (dpkp PR 402) +* Fix most flaky tests, improve debug logging, improve fixture handling (dpkp) +* General style cleanups (dpkp PR 394) +* Raise error on duplicate topic-partition payloads in protocol grouping (dpkp) +* Use module-level loggers instead of simply 'kafka' (dpkp) +* Remove pkg_resources check for __version__ at runtime (dpkp PR 387) +* Make external API consistently support python3 strings for topic (kecaps PR 361) +* Fix correlation id overflow (dpkp PR 355) +* Cleanup kafka/common structs (dpkp PR 338) +* Use context managers in gzip_encode / gzip_decode (dpkp PR 337) +* Save failed request as FailedPayloadsError attribute (jobevers PR 302) +* Remove unused kafka.queue (mumrah) + # 0.9.3 (Feb 3, 2015) * Add coveralls.io support (sontek PR 307) @@ -32,11 +32,11 @@ Copyright 2015, David Arthur under Apache License, v2.0. See `LICENSE` Status ---------- -The current stable version of this package is `0.9.3`_ and is compatible with: +The current stable version of this package is `0.9.4`_ and is compatible with: Kafka broker versions -- 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs] +- 0.8.2.1 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs] - 0.8.1.1 - 0.8.1 - 0.8.0 @@ -47,7 +47,7 @@ Python versions - 2.7 (tested on 2.7.9) - 3.3 (tested on 3.3.5) - 3.4 (tested on 3.4.2) -- pypy (tested on pypy 2.4.0 / python 2.7.8) +- pypy (tested on pypy 2.5.0 / python 2.7.8) .. _Full documentation available on ReadTheDocs: http://kafka-python.readthedocs.org/en/latest/ -.. _0.9.3: https://github.com/mumrah/kafka-python/releases/tag/v0.9.3 +.. _0.9.4: https://github.com/mumrah/kafka-python/releases/tag/v0.9.4 diff --git a/build_integration.sh b/build_integration.sh index 2b81745..5395bb8 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,7 +1,7 @@ #!/bin/bash # Versions available for testing via binary distributions -OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1 0.8.2.0" +OFFICIAL_RELEASES="0.8.0 0.8.1 0.8.1.1 0.8.2.1" # Useful configuration vars, with sensible defaults if [ -z "$SCALA_VERSION" ]; then diff --git a/docs/conf.py b/docs/conf.py index ea223c2..2979560 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,11 +56,10 @@ copyright = u'2015, David Arthur' # built documents. # # The short X.Y version. -with open('../VERSION') as version_file: - version = version_file.read() +exec(open('../kafka/version.py').read()) # The full version, including alpha/beta/rc tags. -release = version +release = __version__ # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/index.rst b/docs/index.rst index e4a9ac7..c499d4c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,11 +16,11 @@ see https://groups.google.com/forum/m/#!forum/kafka-clients Status ------ -The current stable version of this package is `0.9.3 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.3>`_ and is compatible with: +The current stable version of this package is `0.9.4 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.4>`_ and is compatible with: Kafka broker versions -* 0.8.2.0 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs] +* 0.8.2.1 [offset management currently ZK only -- does not support ConsumerCoordinator offset management APIs] * 0.8.1.1 * 0.8.1 * 0.8.0 @@ -31,7 +31,7 @@ Python versions * 2.7 (tested on 2.7.9) * 3.3 (tested on 3.3.5) * 3.4 (tested on 3.4.2) -* pypy (tested on pypy 2.4.0 / python 2.7.8) +* pypy (tested on pypy 2.5.0 / python 2.7.8) License ------- diff --git a/docs/usage.rst b/docs/usage.rst index ca326d4..6417cd8 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -9,21 +9,24 @@ SimpleProducer from kafka import SimpleProducer, KafkaClient # To send messages synchronously - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) - # Note that the application is responsible for encoding messages to type str - producer.send_messages("my-topic", "some message") - producer.send_messages("my-topic", "this method", "is variadic") + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages(b'my-topic', b'some message') + producer.send_messages(b'my-topic', b'this method', b'is variadic') # Send unicode message - producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8')) + +Asynchronous Mode +----------------- + +.. code:: python # To send messages asynchronously - # WARNING: current implementation does not guarantee message delivery on failure! - # messages can get dropped! Use at your own risk! Or help us improve with a PR! producer = SimpleProducer(kafka, async=True) - producer.send_messages("my-topic", "async message") + producer.send_messages(b'my-topic', b'async message') # To wait for acknowledgements # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to @@ -32,13 +35,12 @@ SimpleProducer # by all in sync replicas before sending a response producer = SimpleProducer(kafka, async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000) - - response = producer.send_messages("my-topic", "another message") + ack_timeout=2000, + sync_fail_on_error=False) - if response: - print(response[0].error) - print(response[0].offset) + responses = producer.send_messages(b'my-topic', b'another message') + for r in responses: + logging.info(r.offset) # To send messages in batch. You can use any of the available # producers for doing this. The following producer will collect @@ -56,16 +58,21 @@ Keyed messages .. code:: python - from kafka import (KafkaClient, KeyedProducer, HashedPartitioner, - RoundRobinPartitioner) + from kafka import ( + KafkaClient, KeyedProducer, + Murmur2Partitioner, RoundRobinPartitioner) - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') - # HashedPartitioner is default + # HashedPartitioner is default (currently uses python hash()) producer = KeyedProducer(kafka) - producer.send_messages("my-topic", "key1", "some message") - producer.send_messages("my-topic", "key2", "this methode") + producer.send_messages(b'my-topic', b'key1', b'some message') + producer.send_messages(b'my-topic', b'key2', b'this methode') + # Murmur2Partitioner attempts to mirror the java client hashing + producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) + + # Or just produce round-robin (or just use SimpleProducer) producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) @@ -78,9 +85,9 @@ KafkaConsumer from kafka import KafkaConsumer # To consume messages - consumer = KafkaConsumer("my-topic", - group_id="my_group", - bootstrap_servers=["localhost:9092"]) + consumer = KafkaConsumer('my-topic', + group_id='my_group', + bootstrap_servers=['localhost:9092']) for message in consumer: # message value is raw byte string -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` @@ -88,8 +95,6 @@ KafkaConsumer message.offset, message.key, message.value)) - kafka.close() - messages (m) are namedtuples with attributes: @@ -121,16 +126,16 @@ messages (m) are namedtuples with attributes: # so it can be included in the next commit # # **messages that are not marked w/ task_done currently do not commit! - kafka.task_done(m) + consumer.task_done(m) # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() + consumer.commit() # Batch process interface while True: for m in kafka.fetch_messages(): process_message(m) - kafka.task_done(m) + consumer.task_done(m) Configuration settings can be passed to constructor, @@ -162,13 +167,13 @@ Multiprocess consumer from kafka import KafkaClient, MultiProcessConsumer - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') # This will split the number of partitions among two processes - consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) + consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2) # This will spawn processes such that each handles 2 partitions max - consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", + consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', partitions_per_proc=2) for message in consumer: @@ -186,14 +191,14 @@ Low level from kafka.protocol import KafkaProtocol from kafka.common import ProduceRequest - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') - req = ProduceRequest(topic="my-topic", partition=1, - messages=[create_message("some message")]) + req = ProduceRequest(topic=b'my-topic', partition=1, + messages=[create_message(b'some message')]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() - resps[0].topic # "my-topic" + resps[0].topic # b'my-topic' resps[0].partition # 1 resps[0].error # 0 (hopefully) resps[0].offset # offset of the first message sent in this request diff --git a/kafka/client.py b/kafka/client.py index da86175..13777a4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -138,7 +138,8 @@ class KafkaClient(object): Arguments: payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute; payloads with duplicate topic-partitions + are not supported. encode_fn: a method to encode the list of payloads to a request body, must accept client_id, correlation_id, and payloads as @@ -152,20 +153,29 @@ class KafkaClient(object): List of response objects in the same order as the supplied payloads """ + # encoders / decoders do not maintain ordering currently + # so we need to keep this so we can rebuild order before returning + original_ordering = [(p.topic, p.partition) for p in payloads] + # Group the requests by topic+partition brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) + responses = {} for payload in payloads: - leader = self._get_leader_for_partition(payload.topic, - payload.partition) - - payloads_by_broker[leader].append(payload) - brokers_for_payloads.append(leader) + try: + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + payloads_by_broker[leader].append(payload) + brokers_for_payloads.append(leader) + except KafkaUnavailableError as e: + log.warning('KafkaUnavailableError attempting to send request ' + 'on topic %s partition %d', payload.topic, payload.partition) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) # For each broker, send the list of request payloads # and collect the responses and errors - responses_by_broker = collections.defaultdict(list) broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() @@ -184,7 +194,8 @@ class KafkaClient(object): 'to server %s: %s', requestId, broker, e) for payload in payloads: - responses_by_broker[broker].append(FailedPayloadsError(payload)) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) # No exception, try to get response else: @@ -196,7 +207,8 @@ class KafkaClient(object): log.debug('Request %s does not expect a response ' '(skipping conn.recv)', requestId) for payload in payloads: - responses_by_broker[broker].append(None) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = None continue try: @@ -208,12 +220,17 @@ class KafkaClient(object): requestId, broker, e) for payload in payloads: - responses_by_broker[broker].append(FailedPayloadsError(payload)) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) else: + _resps = [] for payload_response in decoder_fn(response): - responses_by_broker[broker].append(payload_response) - log.debug('Response %s: %s', requestId, responses_by_broker[broker]) + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) # Connection errors generally mean stale metadata # although sometimes it means incorrect api request @@ -223,9 +240,7 @@ class KafkaClient(object): self.reset_all_metadata() # Return responses in the same order as provided - responses_by_payload = [responses_by_broker[broker].pop(0) - for broker in brokers_for_payloads] - return responses_by_payload + return [responses[tp] for tp in original_ordering] def __repr__(self): return '<KafkaClient client_id=%s>' % (self.client_id) @@ -296,6 +311,10 @@ class KafkaClient(object): return sorted(list(self.topic_partitions[topic])) + @property + def topics(self): + return list(self.topic_partitions.keys()) + def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() @@ -349,8 +368,8 @@ class KafkaClient(object): resp = self.send_metadata_request(topics) - log.info('Updating broker metadata: %s', resp.brokers) - log.info('Updating topic metadata: %s', resp.topics) + log.debug('Updating broker metadata: %s', resp.brokers) + log.debug('Updating topic metadata: %s', resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) @@ -395,7 +414,7 @@ class KafkaClient(object): # this error code is provided for admin purposes only # we never talk to replicas, only the leader except ReplicaNotAvailableError: - log.warning('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) # If Known Broker, topic_partition -> BrokerMetadata if leader in self.brokers: diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 6365cfa..0800327 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -8,7 +8,7 @@ from threading import Lock import kafka.common from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - UnknownTopicOrPartitionError, check_error + UnknownTopicOrPartitionError, check_error, KafkaError ) from kafka.util import kafka_bytestring, ReentrantTimer @@ -114,12 +114,13 @@ class Consumer(object): self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): - """ - Commit offsets for this consumer + """Commit stored offsets to Kafka via OffsetCommitRequest (v0) Keyword Arguments: partitions (list): list of partitions to commit, default is to commit all of them + + Returns: True on success, False on failure """ # short circuit if nothing happened. This check is kept outside @@ -135,22 +136,27 @@ class Consumer(object): reqs = [] if partitions is None: # commit all partitions - partitions = self.offsets.keys() + partitions = list(self.offsets.keys()) + log.debug('Committing new offsets for %s, partitions %s', + self.topic, partitions) for partition in partitions: offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + log.debug('Commit offset %d in SimpleConsumer: ' + 'group=%s, topic=%s, partition=%s', + offset, self.group, self.topic, partition) reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - kafka.common.check_error(resp) - - self.count_since_commit = 0 + try: + self.client.send_offset_commit_request(self.group, reqs) + except KafkaError as e: + log.error('%s saving offsets: %s', e.__class__.__name__, e) + return False + else: + self.count_since_commit = 0 + return True def _auto_commit(self): """ diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff..733baa8 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, OffsetRequest, + FetchRequest, KafkaError, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -126,8 +126,8 @@ class SimpleConsumer(Consumer): auto_commit_every_t=auto_commit_every_t) if max_buffer_size is not None and buffer_size > max_buffer_size: - raise ValueError("buffer_size (%d) is greater than " - "max_buffer_size (%d)" % + raise ValueError('buffer_size (%d) is greater than ' + 'max_buffer_size (%d)' % (buffer_size, max_buffer_size)) self.buffer_size = buffer_size self.max_buffer_size = max_buffer_size @@ -144,6 +144,13 @@ class SimpleConsumer(Consumer): (self.group, self.topic, str(self.offsets.keys())) def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': @@ -163,10 +170,17 @@ class SimpleConsumer(Consumer): raise # send_offset_request - (resp, ) = self.client.send_offset_request(reqs) - check_error(resp) - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] def provide_partition_info(self): """ @@ -174,33 +188,62 @@ class SimpleConsumer(Consumer): """ self.partition_info = True - def seek(self, offset, whence): + def seek(self, offset, whence=None, partition=None): """ Alter the current offset in the consumer, similar to fseek Arguments: offset: how much to modify the offset - whence: where to modify it from + whence: where to modify it from, default is None - * 0 is relative to the earliest available offset (head) - * 1 is relative to the current offset - * 2 is relative to the latest known offset (tail) + * None is an absolute offset + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) + + partition: modify which partition, default is None. + If partition is None, would modify all partitions. """ - if whence == 1: # relative to current position - for partition, _offset in self.offsets.items(): - self.offsets[partition] = _offset + offset + if whence is None: # set an absolute offset + if partition is None: + for tmp_partition in self.offsets: + self.offsets[tmp_partition] = offset + else: + self.offsets[partition] = offset + elif whence == 1: # relative to current position + if partition is None: + for tmp_partition, _offset in self.offsets.items(): + self.offsets[tmp_partition] = _offset + offset + else: + self.offsets[partition] += offset elif whence in (0, 2): # relative to beginning or end - # divide the request offset by number of partitions, - # distribute the remained evenly - (delta, rem) = divmod(offset, len(self.offsets)) - deltas = {} - for partition, r in izip_longest(self.offsets.keys(), - repeat(1, rem), fillvalue=0): - deltas[partition] = delta + r - reqs = [] - for partition in self.offsets.keys(): + deltas = {} + if partition is None: + # divide the request offset by number of partitions, + # distribute the remained evenly + (delta, rem) = divmod(offset, len(self.offsets)) + for tmp_partition, r in izip_longest(self.offsets.keys(), + repeat(1, rem), + fillvalue=0): + deltas[tmp_partition] = delta + r + + for tmp_partition in self.offsets.keys(): + if whence == 0: + reqs.append(OffsetRequest(self.topic, + tmp_partition, + -2, + 1)) + elif whence == 2: + reqs.append(OffsetRequest(self.topic, + tmp_partition, + -1, + 1)) + else: + pass + else: + deltas[partition] = offset if whence == 0: reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: @@ -213,7 +256,7 @@ class SimpleConsumer(Consumer): self.offsets[resp.partition] = \ resp.offsets[0] + deltas[resp.partition] else: - raise ValueError("Unexpected value for `whence`, %d" % whence) + raise ValueError('Unexpected value for `whence`, %d' % whence) # Reset queue and fetch offsets since they are invalid self.fetch_offsets = self.offsets.copy() @@ -236,35 +279,32 @@ class SimpleConsumer(Consumer): """ messages = [] if timeout is not None: - max_time = time.time() + timeout + timeout += time.time() new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - result = self._get_message(block, timeout, get_partition_info=True, + log.debug('getting %d messages', count) + while len(messages) < count: + block_time = timeout - time.time() + log.debug('calling _get_message block=%s timeout=%s', block, block_time) + result = self._get_message(block, block_time, + get_partition_info=True, update_offset=False) - if result: - partition, message = result - if self.partition_info: - messages.append(result) - else: - messages.append(message) - new_offsets[partition] = message.offset + 1 - count -= 1 - else: - # Ran out of messages for the last request. - if not block: - # If we're not blocking, break. - break + log.debug('got %s from _get_messages', result) + if not result: + if block and (timeout is None or time.time() <= timeout): + continue + break - # If we have a timeout, reduce it to the - # appropriate value - if timeout is not None: - timeout = max_time - time.time() + partition, message = result + _msg = (partition, message) if self.partition_info else message + messages.append(_msg) + new_offsets[partition] = message.offset + 1 # Update and commit offsets if necessary self.offsets.update(new_offsets) self.count_since_commit += len(messages) self._auto_commit() + log.debug('got %d messages: %s', len(messages), messages) return messages def get_message(self, block=True, timeout=0.1, get_partition_info=None): @@ -278,10 +318,16 @@ class SimpleConsumer(Consumer): If get_partition_info is True, returns (partition, message) If get_partition_info is False, returns message """ - if self.queue.empty(): + start_at = time.time() + while self.queue.empty(): # We're out of messages, go grab some more. + log.debug('internal queue empty, fetching more messages') with FetchContext(self, block, timeout): self._fetch() + + if not block or time.time() > (start_at + timeout): + break + try: partition, message = self.queue.get_nowait() @@ -300,6 +346,7 @@ class SimpleConsumer(Consumer): else: return message except Empty: + log.debug('internal queue empty after fetch - returning None') return None def __iter__(self): @@ -344,23 +391,26 @@ class SimpleConsumer(Consumer): try: check_error(resp) except UnknownTopicOrPartitionError: + log.error('UnknownTopicOrPartitionError for %s:%d', + resp.topic, resp.partition) self.client.reset_topic_metadata(resp.topic) raise except NotLeaderForPartitionError: + log.error('NotLeaderForPartitionError for %s:%d', + resp.topic, resp.partition) self.client.reset_topic_metadata(resp.topic) continue except OffsetOutOfRangeError: - log.warning("OffsetOutOfRangeError for %s - %d. " - "Resetting partition offset...", + log.warning('OffsetOutOfRangeError for %s:%d. ' + 'Resetting partition offset...', resp.topic, resp.partition) self.reset_partition_offset(resp.partition) # Retry this partition retry_partitions[resp.partition] = partitions[resp.partition] continue except FailedPayloadsError as e: - log.warning("Failed payloads of %s" - "Resetting partition offset...", - e.payload) + log.warning('FailedPayloadsError for %s:%d', + e.payload.topic, e.payload.partition) # Retry this partition retry_partitions[e.payload.partition] = partitions[e.payload.partition] continue @@ -379,7 +429,7 @@ class SimpleConsumer(Consumer): except ConsumerFetchSizeTooSmall: if (self.max_buffer_size is not None and buffer_size == self.max_buffer_size): - log.error("Max fetch size %d too small", + log.error('Max fetch size %d too small', self.max_buffer_size) raise if self.max_buffer_size is None: @@ -387,12 +437,12 @@ class SimpleConsumer(Consumer): else: buffer_size = min(buffer_size * 2, self.max_buffer_size) - log.warn("Fetch size too small, increase to %d (2x) " - "and retry", buffer_size) + log.warning('Fetch size too small, increase to %d (2x) ' + 'and retry', buffer_size) retry_partitions[partition] = buffer_size except ConsumerNoMoreData as e: - log.debug("Iteration was ended by %r", e) + log.debug('Iteration was ended by %r', e) except StopIteration: # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) + log.debug('Done iterating over partition %s', partition) partitions = retry_partitions diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py index fdb19bb..5b6ac2d 100644 --- a/kafka/partitioner/__init__.py +++ b/kafka/partitioner/__init__.py @@ -1,6 +1,7 @@ from .roundrobin import RoundRobinPartitioner -from .hashed import HashedPartitioner +from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner __all__ = [ - 'RoundRobinPartitioner', 'HashedPartitioner' + 'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner', + 'LegacyPartitioner' ] diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index fb5e598..6393ce2 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -1,8 +1,26 @@ from .base import Partitioner -class HashedPartitioner(Partitioner): + +class Murmur2Partitioner(Partitioner): """ Implements a partitioner which selects the target partition based on + the hash of the key. Attempts to apply the same hashing + function as mainline java client. + """ + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions + + # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 + idx = (murmur2(key) & 0x7fffffff) % len(partitions) + + return partitions[idx] + + +class LegacyPartitioner(Partitioner): + """DEPRECATED -- See Issue 374 + + Implements a partitioner which selects the target partition based on the hash of the key """ def partition(self, key, partitions=None): @@ -12,3 +30,79 @@ class HashedPartitioner(Partitioner): idx = hash(key) % size return partitions[idx] + + +# Default will change to Murmur2 in 0.10 release +HashedPartitioner = LegacyPartitioner + + +# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 +def murmur2(key): + """Pure-python Murmur2 implementation. + + Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 + + Args: + key: if not a bytearray, converted via bytearray(str(key)) + + Returns: MurmurHash2 of key bytearray + """ + + # Convert key to a bytearray + if not isinstance(key, bytearray): + data = bytearray(str(key)) + + length = len(data) + seed = 0x9747b28c + # 'm' and 'r' are mixing constants generated offline. + # They're not really 'magic', they just happen to work well. + m = 0x5bd1e995 + r = 24 + + # Initialize the hash to a random value + h = seed ^ length + length4 = length / 4 + + for i in range(length4): + i4 = i * 4 + k = ((data[i4 + 0] & 0xff) + + ((data[i4 + 1] & 0xff) << 8) + + ((data[i4 + 2] & 0xff) << 16) + + ((data[i4 + 3] & 0xff) << 24)) + k &= 0xffffffff + k *= m + k &= 0xffffffff + k ^= (k % 0x100000000) >> r # k ^= k >>> r + k &= 0xffffffff + k *= m + k &= 0xffffffff + + h *= m + h &= 0xffffffff + h ^= k + h &= 0xffffffff + + # Handle the last few bytes of the input array + extra_bytes = length % 4 + if extra_bytes == 3: + h ^= (data[(length & ~3) + 2] & 0xff) << 16 + h &= 0xffffffff + + if extra_bytes == 2: + h ^= (data[(length & ~3) + 1] & 0xff) << 8 + h &= 0xffffffff + + if extra_bytes == 1: + h ^= (data[length & ~3] & 0xff) + h &= 0xffffffff + h *= m + h &= 0xffffffff + + h ^= (h % 0x100000000) >> 13 # h >>> 13; + h &= 0xffffffff + h *= m + h &= 0xffffffff + h ^= (h % 0x100000000) >> 15 # h >>> 15; + h &= 0xffffffff + + return h diff --git a/kafka/producer/base.py b/kafka/producer/base.py index bee1888..d5c013a 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -166,8 +166,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_cls: _handle_error(error_cls, orig_req) - log.error('Error sending ProduceRequest (#%d of %d) to %s:%d ' - 'with msgs %s', i + 1, len(requests), + log.error('%s sending ProduceRequest (#%d of %d) ' + 'to %s:%d with msgs %s', + error_cls.__name__, (i + 1), len(requests), orig_req.topic, orig_req.partition, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) diff --git a/kafka/version.py b/kafka/version.py index 5b721ed..cd64b48 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '0.9.4-dev' +__version__ = '0.9.5-dev' diff --git a/servers/0.8.0/resources/kafka.properties b/servers/0.8.0/resources/kafka.properties index c9fd552..b9f5c49 100644 --- a/servers/0.8.0/resources/kafka.properties +++ b/servers/0.8.0/resources/kafka.properties @@ -35,6 +35,10 @@ log.dirs={tmp_dir}/data num.partitions={partitions} default.replication.factor={replicas} +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + ############################# Log Flush Policy ############################# log.flush.interval.messages=10000 @@ -49,7 +53,11 @@ log.cleanup.interval.mins=1 ############################# Zookeeper ############################# zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter diff --git a/servers/0.8.1.1/resources/kafka.properties b/servers/0.8.1.1/resources/kafka.properties index a638f39..685aed1 100644 --- a/servers/0.8.1.1/resources/kafka.properties +++ b/servers/0.8.1.1/resources/kafka.properties @@ -63,6 +63,10 @@ log.dirs={tmp_dir}/data num.partitions={partitions} default.replication.factor={replicas} +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync @@ -116,3 +120,5 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/servers/0.8.1/resources/kafka.properties b/servers/0.8.1/resources/kafka.properties index 5d47520..76b0cb4 100644 --- a/servers/0.8.1/resources/kafka.properties +++ b/servers/0.8.1/resources/kafka.properties @@ -35,6 +35,10 @@ log.dirs={tmp_dir}/data num.partitions={partitions} default.replication.factor={replicas} +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + ############################# Log Flush Policy ############################# log.flush.interval.messages=10000 @@ -56,4 +60,8 @@ log.cleaner.enable=false # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/servers/0.8.2.0/resources/kafka.properties b/servers/0.8.2.0/resources/kafka.properties index a638f39..685aed1 100644 --- a/servers/0.8.2.0/resources/kafka.properties +++ b/servers/0.8.2.0/resources/kafka.properties @@ -63,6 +63,10 @@ log.dirs={tmp_dir}/data num.partitions={partitions} default.replication.factor={replicas} +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync @@ -116,3 +120,5 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/servers/0.8.2.1/resources/kafka.properties b/servers/0.8.2.1/resources/kafka.properties new file mode 100644 index 0000000..685aed1 --- /dev/null +++ b/servers/0.8.2.1/resources/kafka.properties @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/servers/0.8.2.1/resources/log4j.properties b/servers/0.8.2.1/resources/log4j.properties new file mode 100644 index 0000000..f863b3b --- /dev/null +++ b/servers/0.8.2.1/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.logger.kafka=DEBUG, stdout +log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout +log4j.logger.org.apache.zookeeper=INFO, stdout diff --git a/servers/0.8.2.1/resources/zookeeper.properties b/servers/0.8.2.1/resources/zookeeper.properties new file mode 100644 index 0000000..e3fd097 --- /dev/null +++ b/servers/0.8.2.1/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/test/fixtures.py b/test/fixtures.py index d4d03ee..164d0d7 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -4,6 +4,7 @@ import os.path import shutil import subprocess import tempfile +import time from six.moves import urllib import uuid @@ -125,12 +126,18 @@ class ZookeeperFixture(Fixture): # Party! self.out("Starting...") + timeout = 5 + max_timeout = 30 + backoff = 1 while True: self.child = SpawnedService(args, env) self.child.start() - if self.child.wait_for(r"binding to port", timeout=5): + timeout = min(timeout, max_timeout) + if self.child.wait_for(r"binding to port", timeout=timeout): break self.child.stop() + timeout *= 2 + time.sleep(backoff) self.out("Done!") def close(self): @@ -225,12 +232,19 @@ class KafkaFixture(Fixture): args = self.kafka_run_class_args("kafka.Kafka", properties) env = self.kafka_run_class_env() + timeout = 5 + max_timeout = 30 + backoff = 1 while True: self.child = SpawnedService(args, env) self.child.start() - if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5): + timeout = min(timeout, max_timeout) + if self.child.wait_for(r"\[Kafka Server %d\], Started" % + self.broker_id, timeout=timeout): break self.child.stop() + timeout *= 2 + time.sleep(backoff) self.out("Done!") self.running = True diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 585123b..8853350 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -2,8 +2,9 @@ import os from kafka.common import ( FetchRequest, OffsetCommitRequest, OffsetFetchRequest, - KafkaTimeoutError + KafkaTimeoutError, ProduceRequest ) +from kafka.protocol import create_message from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions @@ -49,11 +50,40 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): with self.assertRaises(KafkaTimeoutError): self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0) + @kafka_versions('all') + def test_send_produce_request_maintains_request_response_order(self): + + self.client.ensure_topic_exists(b'foo') + self.client.ensure_topic_exists(b'bar') + + requests = [ + ProduceRequest( + b'foo', 0, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'bar', 1, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'foo', 1, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'bar', 0, + [create_message(b'a'), create_message(b'b')]), + ] + + responses = self.client.send_produce_request(requests) + while len(responses): + request = requests.pop() + response = responses.pop() + self.assertEqual(request.topic, response.topic) + self.assertEqual(request.partition, response.partition) + + #################### # Offset Tests # #################### - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_commit_fetch_offsets(self): req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata") (resp,) = self.client.send_offset_commit_request(b"group", [req]) diff --git a/test/test_consumer.py b/test/test_consumer.py index 08fd620..df15115 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -4,7 +4,7 @@ from . import unittest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer from kafka.common import ( - KafkaConfigurationError, FetchResponse, + KafkaConfigurationError, FetchResponse, OffsetFetchResponse, FailedPayloadsError, OffsetAndMessage, NotLeaderForPartitionError, UnknownTopicOrPartitionError ) @@ -25,10 +25,11 @@ class TestMultiProcessConsumer(unittest.TestCase): client = MagicMock() partitions = (0,) with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets: - consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) + MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member +class TestSimpleConsumer(unittest.TestCase): def test_simple_consumer_failed_payloads(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, @@ -80,6 +81,45 @@ class TestMultiProcessConsumer(unittest.TestCase): with self.assertRaises(UnknownTopicOrPartitionError): consumer.get_messages(20) + def test_simple_consumer_commit_does_not_raise(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + + def mock_offset_fetch_request(group, payloads, **kwargs): + return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads] + + client.send_offset_fetch_request.side_effect = mock_offset_fetch_request + + def mock_offset_commit_request(group, payloads, **kwargs): + raise FailedPayloadsError(payloads[0]) + + client.send_offset_commit_request.side_effect = mock_offset_commit_request + + consumer = SimpleConsumer(client, group='foobar', + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock internal commit check + consumer.count_since_commit = 10 + + # This should not raise an exception + self.assertFalse(consumer.commit(partitions=[0, 1])) + + def test_simple_consumer_reset_partition_offset(self): + client = MagicMock() + + def mock_offset_request(payloads, **kwargs): + raise FailedPayloadsError(payloads[0]) + + client.send_offset_request.side_effect = mock_offset_request + + consumer = SimpleConsumer(client, group='foobar', + topic='topic', partitions=[0, 1], + auto_commit=False) + + # This should not raise an exception + self.assertEqual(consumer.reset_partition_offset(0), None) + @staticmethod def fail_requests_factory(error_factory): # Mock so that only the first request gets a valid response diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 8911e3e..52b3e85 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -132,7 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_simple_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -164,6 +164,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.seek(-13, 2) self.assert_message_count([ message for message in consumer ], 13) + # Set absolute offset + consumer.seek(100) + self.assert_message_count([ message for message in consumer ], 0) + consumer.seek(100, partition=0) + self.assert_message_count([ message for message in consumer ], 0) + consumer.seek(101, partition=1) + self.assert_message_count([ message for message in consumer ], 0) + consumer.seek(90, partition=0) + self.assert_message_count([ message for message in consumer ], 10) + consumer.seek(20, partition=1) + self.assert_message_count([ message for message in consumer ], 80) + consumer.seek(0, partition=1) + self.assert_message_count([ message for message in consumer ], 100) + consumer.stop() @kafka_versions("all") @@ -276,7 +290,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_multi_process_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -342,7 +356,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): big_consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -369,7 +383,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer1.stop() consumer2.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_multi_process_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -477,7 +491,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") def test_kafka_consumer__offset_commit_resume(self): GROUP_ID = random_string(10).encode('utf-8') diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 91e22cf..91779d7 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -2,8 +2,6 @@ import logging import os import time -from . import unittest - from kafka import KafkaClient, SimpleConsumer, KeyedProducer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer.base import Producer @@ -15,6 +13,9 @@ from test.testutil import ( ) +log = logging.getLogger(__name__) + + class TestFailover(KafkaIntegrationTestCase): create_client = False @@ -23,10 +24,10 @@ class TestFailover(KafkaIntegrationTestCase): return zk_chroot = random_string(10) - replicas = 2 - partitions = 2 + replicas = 3 + partitions = 3 - # mini zookeeper, 2 kafka brokers + # mini zookeeper, 3 kafka brokers self.zk = ZookeeperFixture.instance() kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] @@ -73,12 +74,12 @@ class TestFailover(KafkaIntegrationTestCase): timeout = 60 while not recovered and (time.time() - started) < timeout: try: - logging.debug("attempting to send 'success' message after leader killed") + log.debug("attempting to send 'success' message after leader killed") producer.send_messages(topic, partition, b'success') - logging.debug("success!") + log.debug("success!") recovered = True except (FailedPayloadsError, ConnectionError): - logging.debug("caught exception sending message -- will retry") + log.debug("caught exception sending message -- will retry") continue # Verify we successfully sent the message @@ -89,7 +90,9 @@ class TestFailover(KafkaIntegrationTestCase): # count number of messages # Should be equal to 100 before + 1 recovery + 100 after - self.assert_message_count(topic, 201, partitions=(partition,)) + # at_least=True because exactly once delivery isn't really a thing + self.assert_message_count(topic, 201, partitions=(partition,), + at_least=True) @kafka_versions("all") def test_switch_leader_async(self): @@ -110,7 +113,7 @@ class TestFailover(KafkaIntegrationTestCase): # kill leader for partition self._kill_leader(topic, partition) - logging.debug("attempting to send 'success' message after leader killed") + log.debug("attempting to send 'success' message after leader killed") # in async mode, this should return immediately producer.send_messages(topic, partition, b'success') @@ -133,6 +136,7 @@ class TestFailover(KafkaIntegrationTestCase): # count number of messages # Should be equal to 10 before + 1 recovery + 10 after + # at_least=True because exactly once delivery isn't really a thing self.assert_message_count(topic, 21, partitions=(partition,), at_least=True) self.assert_message_count(topic, 21, partitions=(partition + 1,), @@ -164,7 +168,7 @@ class TestFailover(KafkaIntegrationTestCase): if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0: recovered = True except (FailedPayloadsError, ConnectionError): - logging.debug("caught exception sending message -- will retry") + log.debug("caught exception sending message -- will retry") continue # Verify we successfully sent the message @@ -187,12 +191,16 @@ class TestFailover(KafkaIntegrationTestCase): def _send_random_messages(self, producer, topic, partition, n): for j in range(n): - logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) msg = 'msg {0}: {1}'.format(j, random_string(10)) - resp = producer.send_messages(topic, partition, msg.encode('utf-8')) - if len(resp) > 0: - self.assertEqual(resp[0].error, 0) - logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) + log.debug('_send_random_message %s to %s:%d', msg, topic, partition) + while True: + try: + producer.send_messages(topic, partition, msg.encode('utf-8')) + except: + log.exception('failure in _send_random_messages - retrying') + continue + else: + break def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)] |