diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
commit | 828377377da43749af0d27ee256ef31bf714cf17 (patch) | |
tree | fbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/producer | |
parent | 71e7568fcb8132899f366b37c32645fd5a40dc4b (diff) | |
parent | 9a8af1499ca425366d934487469d9977fae7fe5f (diff) | |
download | kafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz |
Merge branch '0.9'
Conflicts:
kafka/codec.py
kafka/version.py
test/test_producer.py
test/test_producer_integration.py
Diffstat (limited to 'kafka/producer')
-rw-r--r-- | kafka/producer/base.py | 89 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 2 | ||||
-rw-r--r-- | kafka/producer/simple.py | 3 |
3 files changed, 51 insertions, 43 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 39b1f84..506da83 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import logging import time try: - from queue import Empty, Full, Queue + from queue import Empty, Full, Queue # pylint: disable=import-error except ImportError: - from Queue import Empty, Full, Queue + from Queue import Empty, Full, Queue # pylint: disable=import-error from collections import defaultdict from threading import Thread, Event @@ -15,14 +15,13 @@ from threading import Thread, Event import six from kafka.common import ( - ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions, + ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions, kafka_errors, UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError, AsyncProducerQueueFull, UnknownError, RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set -from kafka.util import kafka_bytestring log = logging.getLogger('kafka.producer') @@ -62,7 +61,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, Arguments: queue (threading.Queue): the queue from which to get messages - client (KafkaClient): instance to use for communicating with brokers + client (kafka.SimpleClient): instance to use for communicating + with brokers codec (kafka.protocol.ALL_CODECS): compression codec to use batch_time (int): interval in seconds to send message batches batch_size (int): count of messages that will trigger an immediate send @@ -133,9 +133,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key, codec_compresslevel) - req = ProduceRequest(topic_partition.topic, - topic_partition.partition, - tuple(messages)) + req = ProduceRequestPayload( + topic_partition.topic, + topic_partition.partition, + tuple(messages)) request_tries[req] = 0 if not request_tries: @@ -169,13 +170,13 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, error_cls = response.__class__ orig_req = response.payload - elif isinstance(response, ProduceResponse) and response.error: + elif isinstance(response, ProduceResponsePayload) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) orig_req = requests[i] if error_cls: _handle_error(error_cls, orig_req) - log.error('%s sending ProduceRequest (#%d of %d) ' + log.error('%s sending ProduceRequestPayload (#%d of %d) ' 'to %s:%d with msgs %s', error_cls.__name__, (i + 1), len(requests), orig_req.topic, orig_req.partition, @@ -196,8 +197,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, log.warn('Async producer forcing metadata refresh metadata before retrying') try: client.load_metadata_for_topics() - except Exception as e: - log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message) + except Exception: + log.exception("Async producer couldn't reload topic metadata.") # Apply retry limit, dropping messages that are over request_tries = dict( @@ -210,7 +211,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Log messages we are going to retry for orig_req in request_tries.keys(): - log.info('Retrying ProduceRequest to %s:%d with msgs %s', + log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s', orig_req.topic, orig_req.partition, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) @@ -225,9 +226,9 @@ class Producer(object): Base class to be used by producers Arguments: - client (KafkaClient): instance to use for broker communications. - If async=True, the background thread will use client.copy(), - which is expected to return a thread-safe object. + client (kafka.SimpleClient): instance to use for broker + communications. If async=True, the background thread will use + client.copy(), which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. req_acks (int, optional): A value indicating the acknowledgements that the server must receive before responding to the request, @@ -345,22 +346,37 @@ class Producer(object): self.sync_fail_on_error = sync_fail_on_error def send_messages(self, topic, partition, *msg): + """Helper method to send produce requests. + + Note that msg type *must* be encoded to bytes by user. Passing unicode + message will not work, for example you should encode before calling + send_messages via something like `unicode_message.encode('utf-8')` + All messages will set the message 'key' to None. + + Arguments: + topic (str): name of topic for produce request + partition (int): partition number for produce request + *msg (bytes): one or more message payloads + + Returns: + ResponseRequest returned by server + + Raises: + FailedPayloadsError: low-level connection error, can be caused by + networking failures, or a malformed request. + ConnectionError: + KafkaUnavailableError: all known brokers are down when attempting + to refresh metadata. + LeaderNotAvailableError: topic or partition is initializing or + a broker failed and leadership election is in progress. + NotLeaderForPartitionError: metadata is out of sync; the broker + that the request was sent to is not the leader for the topic + or partition. + UnknownTopicOrPartitionError: the topic or partition has not + been created yet and auto-creation is not available. + AsyncProducerQueueFull: in async mode, if too many messages are + unsent and remain in the internal queue. """ - Helper method to send produce requests - @param: topic, name of topic for produce request -- type str - @param: partition, partition number for produce request -- type int - @param: *msg, one or more message payloads -- type bytes - @returns: ResponseRequest returned by server - raises on error - - Note that msg type *must* be encoded to bytes by user. - Passing unicode message will not work, for example - you should encode before calling send_messages via - something like `unicode_message.encode('utf-8')` - - All messages produced via this method will set the message 'key' to Null - """ - topic = kafka_bytestring(topic) return self._send_messages(topic, partition, *msg) def _send_messages(self, topic, partition, *msg, **kwargs): @@ -380,10 +396,6 @@ class Producer(object): elif not isinstance(m, six.binary_type): raise TypeError("all produce message payloads must be null or type bytes") - # Raise TypeError if topic is not encoded as bytes - if not isinstance(topic, six.binary_type): - raise TypeError("the topic must be type bytes") - # Raise TypeError if the key is not encoded as bytes if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") @@ -391,7 +403,7 @@ class Producer(object): if self.async: for idx, m in enumerate(msg): try: - item = (TopicAndPartition(topic, partition), m, key) + item = (TopicPartition(topic, partition), m, key) if self.async_queue_put_timeout == 0: self.queue.put_nowait(item) else: @@ -404,7 +416,7 @@ class Producer(object): resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) - req = ProduceRequest(topic, partition, messages) + req = ProduceRequestPayload(topic, partition, messages) try: resp = self.client.send_produce_request( [req], acks=self.req_acks, timeout=self.ack_timeout, @@ -449,7 +461,8 @@ class Producer(object): # ValueError on list.remove() if the exithandler no longer exists # but that is fine here try: - atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + atexit._exithandlers.remove( # pylint: disable=no-member + (self._cleanup_func, (self,), {})) except ValueError: pass diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index a5a26c9..f35aef0 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -5,7 +5,6 @@ import warnings from .base import Producer from ..partitioner import HashedPartitioner -from ..util import kafka_bytestring log = logging.getLogger(__name__) @@ -38,7 +37,6 @@ class KeyedProducer(Producer): return partitioner.partition(key) def send_messages(self, topic, key, *msg): - topic = kafka_bytestring(topic) partition = self._next_partition(topic, key) return self._send_messages(topic, partition, *msg, key=key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 78d5a4d..1406be6 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -46,9 +46,6 @@ class SimpleProducer(Producer): return next(self.partition_cycles[topic]) def send_messages(self, topic, *msg): - if not isinstance(topic, six.binary_type): - topic = topic.encode('utf-8') - partition = self._next_partition(topic) return super(SimpleProducer, self).send_messages( topic, partition, *msg |