diff options
-rw-r--r-- | MANIFEST.in | 1 | ||||
-rw-r--r-- | VERSION | 1 | ||||
-rw-r--r-- | docs/usage.rst | 6 | ||||
-rw-r--r-- | kafka/__init__.py | 4 | ||||
-rw-r--r-- | kafka/client.py | 9 | ||||
-rw-r--r-- | kafka/common.py | 32 | ||||
-rw-r--r-- | kafka/conn.py | 3 | ||||
-rw-r--r-- | kafka/consumer/base.py | 3 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 4 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 13 | ||||
-rw-r--r-- | kafka/producer/base.py | 285 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 34 | ||||
-rw-r--r-- | kafka/producer/simple.py | 23 | ||||
-rw-r--r-- | kafka/protocol.py | 3 | ||||
-rw-r--r-- | kafka/util.py | 3 | ||||
-rw-r--r-- | kafka/version.py | 1 | ||||
-rw-r--r-- | setup.py | 6 | ||||
-rw-r--r-- | test/fixtures.py | 42 | ||||
-rw-r--r-- | test/service.py | 58 | ||||
-rw-r--r-- | test/test_codec.py | 12 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 2 | ||||
-rw-r--r-- | test/test_failover_integration.py | 62 | ||||
-rw-r--r-- | test/test_producer.py | 127 | ||||
-rw-r--r-- | test/test_producer_integration.py | 46 | ||||
-rw-r--r-- | test/test_util.py | 6 | ||||
-rw-r--r-- | test/testutil.py | 5 | ||||
-rw-r--r-- | tox.ini | 1 |
27 files changed, 605 insertions, 187 deletions
diff --git a/MANIFEST.in b/MANIFEST.in index 68bd793..bdd6505 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1 @@ -include VERSION recursive-include kafka *.py diff --git a/VERSION b/VERSION deleted file mode 100644 index 8caff32..0000000 --- a/VERSION +++ /dev/null @@ -1 +0,0 @@ -0.9.4-dev diff --git a/docs/usage.rst b/docs/usage.rst index 150d121..ca326d4 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -47,7 +47,7 @@ SimpleProducer # Notes: # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup - producer = SimpleProducer(kafka, batch_send=True, + producer = SimpleProducer(kafka, async=True, batch_send_every_n=20, batch_send_every_t=60) @@ -63,8 +63,8 @@ Keyed messages # HashedPartitioner is default producer = KeyedProducer(kafka) - producer.send("my-topic", "key1", "some message") - producer.send("my-topic", "key2", "this methode") + producer.send_messages("my-topic", "key1", "some message") + producer.send_messages("my-topic", "key2", "this methode") producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) diff --git a/kafka/__init__.py b/kafka/__init__.py index 3536084..396a8b8 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,7 +1,5 @@ __title__ = 'kafka' -# Use setuptools to get version from setup.py -import pkg_resources -__version__ = pkg_resources.require('kafka-python')[0].version +from .version import __version__ __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0' diff --git a/kafka/client.py b/kafka/client.py index 6ef9d83..63b33b3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -16,7 +16,8 @@ from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SE from kafka.protocol import KafkaProtocol from kafka.util import kafka_bytestring -log = logging.getLogger("kafka") + +log = logging.getLogger(__name__) class KafkaClient(object): @@ -168,19 +169,19 @@ class KafkaClient(object): responses_by_broker = collections.defaultdict(list) broker_failures = [] for broker, payloads in payloads_by_broker.items(): - conn = self._get_conn(broker.host.decode('utf-8'), broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) # Send the request, recv the response try: + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) conn.send(requestId, request) except ConnectionError as e: broker_failures.append(broker) log.warning("Could not send request [%s] to server %s: %s", - binascii.b2a_hex(request), conn, e) + binascii.b2a_hex(request), broker, e) for payload in payloads: responses_by_broker[broker].append(FailedPayloadsError(payload)) @@ -422,6 +423,8 @@ class KafkaClient(object): Arguments: payloads (list of ProduceRequest): produce requests to send to kafka + ProduceRequest payloads must not contain duplicates for any + topic-partition. acks (int, optional): how many acks the servers should receive from replica brokers before responding to the request. If it is 0, the server will not send any response. If it is 1, the server will wait diff --git a/kafka/common.py b/kafka/common.py index 8207bec..66987ff 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -71,6 +71,11 @@ TopicAndPartition = namedtuple("TopicAndPartition", KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) +# Define retry policy for async producer +# Limit value: int >= 0, 0 means no retries +RetryOptions = namedtuple("RetryOptions", + ["limit", "backoff_ms", "retry_on_timeouts"]) + ################# # Exceptions # @@ -164,9 +169,9 @@ class KafkaTimeoutError(KafkaError): class FailedPayloadsError(KafkaError): - def __init__(self, failed_payloads, *args): + def __init__(self, payload, *args): super(FailedPayloadsError, self).__init__(*args) - self.failed_payloads = failed_payloads + self.payload = payload class ConnectionError(KafkaError): @@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError): pass +class AsyncProducerQueueFull(KafkaError): + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: @@ -215,6 +226,23 @@ kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) def check_error(response): + if isinstance(response, Exception): + raise response if response.error: error_class = kafka_errors.get(response.error, UnknownError) raise error_class(response) + + +RETRY_BACKOFF_ERROR_TYPES = ( + KafkaUnavailableError, LeaderNotAvailableError, + ConnectionError, FailedPayloadsError +) + + +RETRY_REFRESH_ERROR_TYPES = ( + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + LeaderNotAvailableError, ConnectionError +) + + +RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES diff --git a/kafka/conn.py b/kafka/conn.py index ea55481..7a49d8c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -9,7 +9,8 @@ import six from kafka.common import ConnectionError -log = logging.getLogger("kafka") + +log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index f53217f..6365cfa 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -13,7 +13,8 @@ from kafka.common import ( from kafka.util import kafka_bytestring, ReentrantTimer -log = logging.getLogger("kafka") + +log = logging.getLogger('kafka.consumer') AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 891af04..8cec92d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -18,9 +18,11 @@ from .base import ( ) from .simple import Consumer, SimpleConsumer + +log = logging.getLogger(__name__) + Events = namedtuple("Events", ["start", "pause", "exit"]) -log = logging.getLogger("kafka") def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index ae00dab..88eeada 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -19,7 +19,7 @@ from kafka.common import ( FetchRequest, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, - OffsetOutOfRangeError, check_error + OffsetOutOfRangeError, FailedPayloadsError, check_error ) from .base import ( Consumer, @@ -34,7 +34,9 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) -log = logging.getLogger("kafka") + +log = logging.getLogger(__name__) + class FetchContext(object): """ @@ -353,6 +355,13 @@ class SimpleConsumer(Consumer): # Retry this partition retry_partitions[resp.partition] = partitions[resp.partition] continue + except FailedPayloadsError as e: + log.warning("Failed payloads of %s" + "Resetting partition offset...", + e.payload) + # Retry this partition + retry_partitions[e.payload.partition] = partitions[e.payload.partition] + continue partition = resp.partition buffer_size = partitions[partition] diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de4..18af342 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import logging import time try: - from queue import Empty, Queue + from queue import Empty, Full, Queue except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Full, Queue from collections import defaultdict from threading import Thread, Event @@ -15,40 +15,97 @@ from threading import Thread, Event import six from kafka.common import ( - ProduceRequest, TopicAndPartition, UnsupportedCodecError + ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions, + kafka_errors, UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull, UnknownError, + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES ) + from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring -log = logging.getLogger("kafka") +log = logging.getLogger('kafka.producer') BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +# unlimited +ASYNC_QUEUE_MAXSIZE = 0 +ASYNC_QUEUE_PUT_TIMEOUT = 0 +# unlimited retries by default +ASYNC_RETRY_LIMIT = None +ASYNC_RETRY_BACKOFF_MS = 100 +ASYNC_RETRY_ON_TIMEOUTS = True +ASYNC_LOG_MESSAGES_ON_ERROR = True + STOP_ASYNC_PRODUCER = -1 +ASYNC_STOP_TIMEOUT_SECS = 30 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): - """ - Listen on the queue for a specified number of messages or till - a specified timeout and send them upstream to the brokers in one - request + req_acks, ack_timeout, retry_options, stop_event, + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + """Private method to manage producing messages asynchronously + + Listens on the queue for a specified number of messages or until + a specified timeout and then sends messages to the brokers in grouped + requests (one per broker). + + Messages placed on the queue should be tuples that conform to this format: + ((topic, partition), message, key) + + Currently does not mark messages with task_done. Do not attempt to join()! + + Arguments: + queue (threading.Queue): the queue from which to get messages + client (KafkaClient): instance to use for communicating with brokers + codec (kafka.protocol.ALL_CODECS): compression codec to use + batch_time (int): interval in seconds to send message batches + batch_size (int): count of messages that will trigger an immediate send + req_acks: required acks to use with ProduceRequests. see server protocol + ack_timeout: timeout to wait for required acks. see server protocol + retry_options (RetryOptions): settings for retry limits, backoff etc + stop_event (threading.Event): event to monitor for shutdown signal. + when this event is 'set', the producer will stop sending messages. + log_messages_on_error (bool, optional): log stringified message-contents + on any produce error, otherwise only log a hash() of the contents, + defaults to True. + stop_timeout (int or float, optional): number of seconds to continue + retrying messages after stop_event is set, defaults to 30. """ - stop = False + request_tries = {} + client.reinit() + stop_at = None + + while not (stop_event.is_set() and queue.empty() and not request_tries): + + # Handle stop_timeout + if stop_event.is_set(): + if not stop_at: + stop_at = stop_timeout + time.time() + if time.time() > stop_at: + log.debug('Async producer stopping due to stop_timeout') + break - while not stop_event.is_set(): timeout = batch_time count = batch_size send_at = time.time() + timeout msgset = defaultdict(list) + # Merging messages will require a bit more work to manage correctly + # for now, dont look for new batches if we have old ones to retry + if request_tries: + count = 0 + log.debug('Skipping new batch collection to handle retries') + else: + log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout)) + # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: try: topic_partition, msg, key = queue.get(timeout=timeout) - except Empty: break @@ -63,20 +120,84 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, msgset[topic_partition].append((msg, key)) # Send collected requests upstream - reqs = [] for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, - messages) - reqs.append(req) - - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + tuple(messages)) + request_tries[req] = 0 + + if not request_tries: + continue + + reqs_to_retry, error_cls = [], None + retry_state = { + 'do_backoff': False, + 'do_refresh': False + } + + def _handle_error(error_cls, request): + if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): + reqs_to_retry.append(request) + if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): + retry_state['do_backoff'] |= True + if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): + retry_state['do_refresh'] |= True + + reply = client.send_produce_request(request_tries.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + for i, response in enumerate(reply): + error_cls = None + if isinstance(response, FailedPayloadsError): + error_cls = response.__class__ + orig_req = response.payload + + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + orig_req = request_tries.keys()[i] + + if error_cls: + _handle_error(error_cls, orig_req) + log.error('Error sending ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) + + if not reqs_to_retry: + request_tries = {} + continue + + # doing backoff before next retry + if retry_state['do_backoff'] and retry_options.backoff_ms: + log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + + # refresh topic metadata before next retry + if retry_state['do_refresh']: + log.warn('Async producer forcing metadata refresh metadata before retrying') + client.load_metadata_for_topics() + + # Apply retry limit, dropping messages that are over + request_tries = dict( + (key, count + 1) + for (key, count) in request_tries.items() + if key in reqs_to_retry + and (retry_options.limit is None + or (count < retry_options.limit)) + ) + + # Log messages we are going to retry + for orig_req in request_tries.keys(): + log.info('Retrying ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) + + if request_tries or not queue.empty(): + log.error('Stopped producer with {0} unsent messages' + .format(len(request_tries) + queue.qsize())) class Producer(object): @@ -84,42 +205,71 @@ class Producer(object): Base class to be used by producers Arguments: - client: The Kafka client instance to use - async: If set to true, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - WARNING!!! current implementation of async producer does not - guarantee message delivery. Use at your own risk! Or help us - improve with a PR! - req_acks: A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout + client (KafkaClient): instance to use for broker communications. + codec (kafka.protocol.ALL_CODECS): compression codec to use. + req_acks (int, optional): A value indicating the acknowledgements that + the server must receive before responding to the request, + defaults to 1 (local ack). + ack_timeout (int, optional): millisecond timeout to wait for the + configured req_acks, defaults to 1000. + async (bool, optional): send message using a background thread, + defaults to False. + batch_send_every_n (int, optional): If async is True, messages are + sent in batches of this size, defaults to 20. + batch_send_every_t (int or float, optional): If async is True, + messages are sent immediately after this timeout in seconds, even + if there are fewer than batch_send_every_n, defaults to 20. + async_retry_limit (int, optional): number of retries for failed messages + or None for unlimited, defaults to None / unlimited. + async_retry_backoff_ms (int, optional): milliseconds to backoff on + failed messages, defaults to 100. + async_retry_on_timeouts (bool, optional): whether to retry on + RequestTimeoutError, defaults to True. + async_queue_maxsize (int, optional): limit to the size of the + internal message queue in number of messages (not size), defaults + to 0 (no limit). + async_queue_put_timeout (int or float, optional): timeout seconds + for queue.put in send_messages for async producers -- will only + apply if async_queue_maxsize > 0 and the queue is Full, + defaults to 0 (fail immediately on full queue). + async_log_messages_on_error (bool, optional): set to False and the + async producer will only log hash() contents on failed produce + requests, defaults to True (log full messages). Hash logging + will not allow you to identify the specific message that failed, + but it will allow you to match failures with retries. + async_stop_timeout (int or float, optional): seconds to continue + attempting to send queued messages after producer.stop(), + defaults to 30. + + Deprecated Arguments: + batch_send (bool, optional): If True, messages are sent by a background + thread in batches, defaults to False. Deprecated, use 'async' """ - ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed - DEFAULT_ACK_TIMEOUT = 1000 - def __init__(self, client, async=False, + def __init__(self, client, req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, - batch_send=False, + async=False, + batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - - if batch_send: - async = True + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + + if async: assert batch_send_every_n > 0 assert batch_send_every_t > 0 - else: - batch_send_every_n = 1 - batch_send_every_t = 3600 + assert async_queue_maxsize >= 0 self.client = client self.async = async @@ -135,20 +285,23 @@ class Producer(object): self.codec = codec if self.async: - log.warning("async producer does not guarantee message delivery!") - log.warning("Current implementation does not retry Failed messages") - log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout + async_retry_options = RetryOptions( + limit=async_retry_limit, + backoff_ms=async_retry_backoff_ms, + retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() - self.thread = Thread(target=_send_upstream, - args=(self.queue, - self.client.copy(), - self.codec, - batch_send_every_t, - batch_send_every_n, - self.req_acks, - self.ack_timeout, - self.thread_stop_event)) + self.thread = Thread( + target=_send_upstream, + args=(self.queue, self.client.copy(), self.codec, + batch_send_every_t, batch_send_every_n, + self.req_acks, self.ack_timeout, + async_retry_options, self.thread_stop_event), + kwargs={'log_messages_on_error': async_log_messages_on_error, + 'stop_timeout': async_stop_timeout} + ) # Thread will die if main thread exits self.thread.daemon = True @@ -199,8 +352,18 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: - for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + for idx, m in enumerate(msg): + try: + item = (TopicAndPartition(topic, partition), m, key) + if self.async_queue_put_timeout == 0: + self.queue.put_nowait(item) + else: + self.queue.put(item, True, self.async_queue_put_timeout) + except Full: + raise AsyncProducerQueueFull( + msg[idx:], + 'Producer async queue overfilled. ' + 'Current queue size %d.' % self.queue.qsize()) resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 333b6c0..5fe9b12 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -1,16 +1,18 @@ from __future__ import absolute_import import logging +import warnings from kafka.partitioner import HashedPartitioner from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) -log = logging.getLogger("kafka") +log = logging.getLogger(__name__) class KeyedProducer(Producer): @@ -37,16 +39,26 @@ class KeyedProducer(Producer): codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner self.partitioners = {} - super(KeyedProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, + super(KeyedProducer, self).__init__(client, req_acks, ack_timeout, + codec, async, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic, key): if topic not in self.partitioners: @@ -58,15 +70,15 @@ class KeyedProducer(Producer): partitioner = self.partitioners[topic] return partitioner.partition(key) - def send_messages(self,topic,key,*msg): + def send_messages(self, topic, key, *msg): topic = kafka_bytestring(topic) partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, *msg,key=key) + return self._send_messages(topic, partition, *msg, key=key) + # DEPRECATED def send(self, topic, key, msg): - topic = kafka_bytestring(topic) - partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, msg, key=key) + warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning) + return self.send_messages(topic, key, msg) def __repr__(self): return '<KeyedProducer batch=%s>' % self.async diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 2699cf2..280a02e 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,10 +10,11 @@ from six.moves import xrange from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) -log = logging.getLogger("kafka") +log = logging.getLogger(__name__) class SimpleProducer(Producer): @@ -45,13 +46,23 @@ class SimpleProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True): + random_start=True, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} self.random_start = random_start - super(SimpleProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, + super(SimpleProducer, self).__init__(client, req_acks, ack_timeout, + codec, async, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/kafka/protocol.py b/kafka/protocol.py index b34a95d..f12e6a3 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -21,7 +21,8 @@ from kafka.util import ( write_short_string, write_int_string, group_by_topic_and_partition ) -log = logging.getLogger("kafka") + +log = logging.getLogger(__name__) ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 diff --git a/kafka/util.py b/kafka/util.py index 78c3607..6d9d307 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -82,6 +82,9 @@ def relative_unpack(fmt, data, cur): def group_by_topic_and_partition(tuples): out = collections.defaultdict(dict) for t in tuples: + assert t.topic not in out or t.partition not in out[t.topic], \ + 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, + t.topic, t.partition) out[t.topic][t.partition] = t return out diff --git a/kafka/version.py b/kafka/version.py new file mode 100644 index 0000000..5b721ed --- /dev/null +++ b/kafka/version.py @@ -0,0 +1 @@ +__version__ = '0.9.4-dev' @@ -2,9 +2,9 @@ import sys import os from setuptools import setup, Command -with open('VERSION', 'r') as v: - __version__ = v.read().rstrip() - +# Pull version from source without importing +# since we can't import something we haven't built yet :) +exec(open('kafka/version.py').read()) class Tox(Command): diff --git a/test/fixtures.py b/test/fixtures.py index 3c496fd..90d01f1 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -11,6 +11,10 @@ from six.moves.urllib.parse import urlparse # pylint: disable-msg=E0611 from test.service import ExternalService, SpawnedService from test.testutil import get_open_port + +log = logging.getLogger(__name__) + + class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') @@ -35,21 +39,21 @@ class Fixture(object): output_file = os.path.join(output_dir, distfile + '.tgz') if os.path.isfile(output_file): - logging.info("Found file already on disk: %s", output_file) + log.info("Found file already on disk: %s", output_file) return output_file # New tarballs are .tgz, older ones are sometimes .tar.gz try: url = url_base + distfile + '.tgz' - logging.info("Attempting to download %s", url) + log.info("Attempting to download %s", url) response = urllib.request.urlopen(url) except urllib.error.HTTPError: - logging.exception("HTTP Error") + log.exception("HTTP Error") url = url_base + distfile + '.tar.gz' - logging.info("Attempting to download %s", url) + log.info("Attempting to download %s", url) response = urllib.request.urlopen(url) - logging.info("Saving distribution file to %s", output_file) + log.info("Saving distribution file to %s", output_file) with open(output_file, 'w') as output_file_fd: output_file_fd.write(response.read()) @@ -101,14 +105,14 @@ class ZookeeperFixture(Fixture): self.child = None def out(self, message): - logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message) + log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message) def open(self): self.tmp_dir = tempfile.mkdtemp() self.out("Running local instance...") - logging.info(" host = %s", self.host) - logging.info(" port = %s", self.port) - logging.info(" tmp_dir = %s", self.tmp_dir) + log.info(" host = %s", self.host) + log.info(" port = %s", self.port) + log.info(" tmp_dir = %s", self.tmp_dir) # Generate configs template = self.test_resource("zookeeper.properties") @@ -167,7 +171,7 @@ class KafkaFixture(Fixture): self.running = False def out(self, message): - logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message) + log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message) def open(self): if self.running: @@ -176,15 +180,15 @@ class KafkaFixture(Fixture): self.tmp_dir = tempfile.mkdtemp() self.out("Running local instance...") - logging.info(" host = %s", self.host) - logging.info(" port = %s", self.port) - logging.info(" broker_id = %s", self.broker_id) - logging.info(" zk_host = %s", self.zk_host) - logging.info(" zk_port = %s", self.zk_port) - logging.info(" zk_chroot = %s", self.zk_chroot) - logging.info(" replicas = %s", self.replicas) - logging.info(" partitions = %s", self.partitions) - logging.info(" tmp_dir = %s", self.tmp_dir) + log.info(" host = %s", self.host) + log.info(" port = %s", self.port) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zk_host) + log.info(" zk_port = %s", self.zk_port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir) # Create directories os.mkdir(os.path.join(self.tmp_dir, "logs")) diff --git a/test/service.py b/test/service.py index dcd3e68..7627322 100644 --- a/test/service.py +++ b/test/service.py @@ -11,9 +11,13 @@ __all__ = [ ] + +log = logging.getLogger(__name__) + + class ExternalService(object): def __init__(self, host, port): - logging.info("Using already running service at %s:%d", host, port) + log.info("Using already running service at %s:%d", host, port) self.host = host self.port = port @@ -36,19 +40,37 @@ class SpawnedService(threading.Thread): self.captured_stderr = [] self.should_die = threading.Event() + self.child = None + self.alive = False def run(self): self.run_with_handles() - def run_with_handles(self): + def _spawn(self): + if self.alive: return + if self.child and self.child.poll() is None: return + self.child = subprocess.Popen( self.args, env=self.env, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - alive = True + self.alive = True + + def _despawn(self): + self.child.terminate() + self.alive = False + for _ in range(50): + if self.child.poll() is not None: + self.child = None + break + time.sleep(0.1) + else: + self.child.kill() + def run_with_handles(self): + self._spawn() while True: (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) @@ -60,26 +82,22 @@ class SpawnedService(threading.Thread): line = self.child.stderr.readline() self.captured_stderr.append(line.decode('utf-8')) - if self.should_die.is_set(): - self.child.terminate() - alive = False + if self.child.poll() is not None: + self.dump_logs() + self._spawn() - poll_results = self.child.poll() - if poll_results is not None: - if not alive: - break - else: - self.dump_logs() - raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args)) + if self.should_die.is_set(): + self._despawn() + break def dump_logs(self): - logging.critical('stderr') + log.critical('stderr') for line in self.captured_stderr: - logging.critical(line.rstrip()) + log.critical(line.rstrip()) - logging.critical('stdout') + log.critical('stdout') for line in self.captured_stdout: - logging.critical(line.rstrip()) + log.critical(line.rstrip()) def wait_for(self, pattern, timeout=30): t1 = time.time() @@ -89,16 +107,16 @@ class SpawnedService(threading.Thread): try: self.child.kill() except: - logging.exception("Received exception when killing child process") + log.exception("Received exception when killing child process") self.dump_logs() raise RuntimeError("Waiting for %r timed out after %d seconds" % (pattern, timeout)) if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: - logging.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1)) + log.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1)) return if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: - logging.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1)) + log.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1)) return time.sleep(0.1) diff --git a/test/test_codec.py b/test/test_codec.py index 2d7670a..3416fdb 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -13,16 +13,16 @@ from test.testutil import random_string class TestCodec(unittest.TestCase): def test_gzip(self): for i in xrange(1000): - s1 = random_string(100) - s2 = gzip_decode(gzip_encode(s1)) - self.assertEqual(s1, s2) + b1 = random_string(100).encode('utf-8') + b2 = gzip_decode(gzip_encode(b1)) + self.assertEqual(b1, b2) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): for i in xrange(1000): - s1 = random_string(100) - s2 = snappy_decode(snappy_encode(s1)) - self.assertEqual(s1, s2) + b1 = random_string(100).encode('utf-8') + b2 = snappy_decode(snappy_encode(b1)) + self.assertEqual(b1, b2) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_detect_xerial(self): diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index ddb54a7..3825f94 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -475,7 +475,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") def test_kafka_consumer__offset_commit_resume(self): - GROUP_ID = random_string(10) + GROUP_ID = random_string(10).encode('utf-8') self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 3be0189..1d835e2 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -92,15 +92,14 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 100 before + 1 recovery + 100 after self.assert_message_count(topic, 201, partitions=(partition,)) - - #@kafka_versions("all") - @unittest.skip("async producer does not support reliable failover yet") + @kafka_versions("all") def test_switch_leader_async(self): topic = self.topic partition = 0 # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async=True) + producer = Producer(self.client, async=True, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) # Send 10 random messages self._send_random_messages(producer, topic, partition, 10) @@ -111,19 +110,26 @@ class TestFailover(KafkaIntegrationTestCase): logging.debug("attempting to send 'success' message after leader killed") # in async mode, this should return immediately - producer.send_messages(topic, partition, 'success') + producer.send_messages(topic, partition, b'success') # send to new leader self._send_random_messages(producer, topic, partition, 10) - # wait until producer queue is empty - while not producer.queue.empty(): - time.sleep(0.1) + # Stop the producer and wait for it to shutdown producer.stop() + started = time.time() + timeout = 60 + while (time.time() - started) < timeout: + if not producer.thread.is_alive(): + break + time.sleep(0.1) + else: + self.fail('timeout waiting for producer queue to empty') # count number of messages # Should be equal to 10 before + 1 recovery + 10 after - self.assert_message_count(topic, 21, partitions=(partition,)) + self.assert_message_count(topic, 21, partitions=(partition,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): @@ -133,8 +139,8 @@ class TestFailover(KafkaIntegrationTestCase): # Send 10 random messages for _ in range(10): - key = random_string(3) - msg = random_string(10) + key = random_string(3).encode('utf-8') + msg = random_string(10).encode('utf-8') producer.send_messages(topic, key, msg) # kill leader for partition 0 @@ -145,8 +151,8 @@ class TestFailover(KafkaIntegrationTestCase): timeout = 60 while not recovered and (time.time() - started) < timeout: try: - key = random_string(3) - msg = random_string(10) + key = random_string(3).encode('utf-8') + msg = random_string(10).encode('utf-8') producer.send_messages(topic, key, msg) if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0: recovered = True @@ -159,15 +165,24 @@ class TestFailover(KafkaIntegrationTestCase): # send some more messages just to make sure no more exceptions for _ in range(10): - key = random_string(3) - msg = random_string(10) + key = random_string(3).encode('utf-8') + msg = random_string(10).encode('utf-8') producer.send_messages(topic, key, msg) + @kafka_versions("all") + def test_switch_leader_simple_consumer(self): + producer = Producer(self.client, async=False) + consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) + self._send_random_messages(producer, self.topic, 0, 2) + consumer.get_messages() + self._kill_leader(self.topic, 0) + consumer.get_messages() def _send_random_messages(self, producer, topic, partition, n): for j in range(n): logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) - resp = producer.send_messages(topic, partition, random_string(10)) + msg = 'msg {0}: {1}'.format(j, random_string(10)) + resp = producer.send_messages(topic, partition, msg.encode('utf-8')) if len(resp) > 0: self.assertEqual(resp[0].error, 0) logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) @@ -178,12 +193,12 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def assert_message_count(self, topic, check_count, timeout=10, partitions=None): + def assert_message_count(self, topic, check_count, timeout=10, + partitions=None, at_least=False): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) - group = random_string(10) consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, @@ -193,10 +208,17 @@ class TestFailover(KafkaIntegrationTestCase): pending = consumer.pending(partitions) # Keep checking if it isn't immediately correct, subject to timeout - while pending != check_count and (time.time() - started_at < timeout): + while pending < check_count and (time.time() - started_at < timeout): pending = consumer.pending(partitions) + time.sleep(0.5) consumer.stop() client.close() - self.assertEqual(pending, check_count) + if pending < check_count: + self.fail('Too few pending messages: found %d, expected %d' % + (pending, check_count)) + elif pending > check_count and not at_least: + self.fail('Too many pending messages: found %d, expected %d' % + (pending, check_count)) + return True diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a..c12af02 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,26 @@ # -*- coding: utf-8 -*- +import time import logging -from mock import MagicMock +from mock import MagicMock, patch from . import unittest +from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions +from kafka.common import AsyncProducerQueueFull from kafka.producer.base import Producer +from kafka.producer.base import _send_upstream +from kafka.protocol import CODEC_NONE + +import threading +try: + from queue import Empty, Queue +except ImportError: + from Queue import Empty, Queue +try: + xrange +except NameError: + xrange = range class TestKafkaProducer(unittest.TestCase): @@ -40,3 +55,113 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(AsyncProducerQueueFull): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + self.client = MagicMock() + self.queue = Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + stop_event = threading.Event() + retry_options = RetryOptions(limit=retries_limit, + backoff_ms=50, + retry_on_timeouts=False) + self.thread = threading.Thread( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + retry_options, + stop_event)) + self.thread.daemon = True + self.thread.start() + time.sleep(sleep_timeout) + stop_event.set() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.client.send_produce_request.call_count, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [FailedPayloadsError(req) for req in reqs] + return [] + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void cals: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) + + def send_side_effect(reqs, *args, **kwargs): + return [FailedPayloadsError(req) for req in reqs] + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 3) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void cals: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + self.assertEqual(self.client.send_produce_request.call_count, 16) + + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index e3f7767..3439f4e 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -221,13 +221,14 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): batch_interval = 5 producer = SimpleProducer( self.client, - batch_send=True, + async=True, batch_send_every_n=batch_messages, batch_send_every_t=batch_interval, random_start=False) # Send 4 messages -- should not trigger a batch - resp = producer.send_messages(self.topic, + resp = producer.send_messages( + self.topic, self.msg("one"), self.msg("two"), self.msg("three"), @@ -242,7 +243,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(partitions[1], start_offsets[1], []) # send 3 more messages -- should trigger batch on first 5 - resp = producer.send_messages(self.topic, + resp = producer.send_messages( + self.topic, self.msg("five"), self.msg("six"), self.msg("seven"), @@ -251,6 +253,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # Batch mode is async. No ack self.assertEqual(len(resp), 0) + # Wait until producer has pulled all messages from internal queue + # this should signal that the first batch was sent, and the producer + # is now waiting for enough messages to batch again (or a timeout) + timeout = 5 + start = time.time() + while not producer.queue.empty(): + if time.time() - start > timeout: + self.fail('timeout waiting for producer queue to empty') + time.sleep(0.1) + # send messages groups all *msgs in a single call to the same partition # so we should see all messages from the first call in one partition self.assert_fetch_offset(partitions[0], start_offsets[0], [ @@ -273,14 +285,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offsets = [self.current_offset(self.topic, p) for p in partitions] batch_interval = 5 - producer = SimpleProducer(self.client, - batch_send=True, + producer = SimpleProducer( + self.client, + async=True, batch_send_every_n=100, batch_send_every_t=batch_interval, random_start=False) # Send 5 messages and do a fetch - resp = producer.send_messages(self.topic, + resp = producer.send_messages( + self.topic, self.msg("one"), self.msg("two"), self.msg("three"), @@ -332,10 +346,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offsets = [self.current_offset(self.topic, p) for p in partitions] producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send(self.topic, self.key("key1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("key2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("key3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("key4"), self.msg("four")) + resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two")) + resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three")) + resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) self.assert_produce_response(resp1, start_offsets[0]+0) self.assert_produce_response(resp2, start_offsets[1]+0) @@ -353,11 +367,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offsets = [self.current_offset(self.topic, p) for p in partitions] producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send(self.topic, self.key("1"), self.msg("one")) - resp2 = producer.send(self.topic, self.key("2"), self.msg("two")) - resp3 = producer.send(self.topic, self.key("3"), self.msg("three")) - resp4 = producer.send(self.topic, self.key("3"), self.msg("four")) - resp5 = producer.send(self.topic, self.key("4"), self.msg("five")) + resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one")) + resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two")) + resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three")) + resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four")) + resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five")) offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} messages = {partitions[0]: [], partitions[1]: []} @@ -386,7 +400,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) - resp = producer.send(self.topic, self.key("key1"), self.msg("one")) + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) self.assertEqual(len(resp), 0) # wait for the server to report a new highwatermark diff --git a/test/test_util.py b/test/test_util.py index 6a8f45b..ea3783e 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -108,7 +108,6 @@ class UtilTest(unittest.TestCase): l = [ t("a", 1), - t("a", 1), t("a", 2), t("a", 3), t("b", 3), @@ -124,3 +123,8 @@ class UtilTest(unittest.TestCase): 3: t("b", 3), } }) + + # should not be able to group duplicate topic-partitions + t1 = t("a", 1) + with self.assertRaises(AssertionError): + kafka.util.group_by_topic_and_partition([t1, t1]) diff --git a/test/testutil.py b/test/testutil.py index e6947b4..1f1a1df 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -23,8 +23,7 @@ __all__ = [ ] def random_string(l): - s = "".join(random.choice(string.ascii_letters) for i in xrange(l)) - return s.encode('utf-8') + return "".join(random.choice(string.ascii_letters) for i in xrange(l)) def kafka_versions(*versions): def kafka_versions(func): @@ -60,7 +59,7 @@ class KafkaIntegrationTestCase(unittest.TestCase): return if not self.topic: - topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8')) + topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) self.topic = topic self.bytes_topic = topic.encode('utf-8') @@ -14,6 +14,7 @@ commands = nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka} setenv = PROJECT_ROOT = {toxinidir} +passenv = KAFKA_VERSION [testenv:py33] deps = |