summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
commit828377377da43749af0d27ee256ef31bf714cf17 (patch)
treefbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/producer
parent71e7568fcb8132899f366b37c32645fd5a40dc4b (diff)
parent9a8af1499ca425366d934487469d9977fae7fe5f (diff)
downloadkafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz
Merge branch '0.9'
Conflicts: kafka/codec.py kafka/version.py test/test_producer.py test/test_producer_integration.py
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/base.py89
-rw-r--r--kafka/producer/keyed.py2
-rw-r--r--kafka/producer/simple.py3
3 files changed, 51 insertions, 43 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 39b1f84..506da83 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -5,9 +5,9 @@ import logging
import time
try:
- from queue import Empty, Full, Queue
+ from queue import Empty, Full, Queue # pylint: disable=import-error
except ImportError:
- from Queue import Empty, Full, Queue
+ from Queue import Empty, Full, Queue # pylint: disable=import-error
from collections import defaultdict
from threading import Thread, Event
@@ -15,14 +15,13 @@ from threading import Thread, Event
import six
from kafka.common import (
- ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions,
+ ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
-from kafka.util import kafka_bytestring
log = logging.getLogger('kafka.producer')
@@ -62,7 +61,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
Arguments:
queue (threading.Queue): the queue from which to get messages
- client (KafkaClient): instance to use for communicating with brokers
+ client (kafka.SimpleClient): instance to use for communicating
+ with brokers
codec (kafka.protocol.ALL_CODECS): compression codec to use
batch_time (int): interval in seconds to send message batches
batch_size (int): count of messages that will trigger an immediate send
@@ -133,9 +133,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key, codec_compresslevel)
- req = ProduceRequest(topic_partition.topic,
- topic_partition.partition,
- tuple(messages))
+ req = ProduceRequestPayload(
+ topic_partition.topic,
+ topic_partition.partition,
+ tuple(messages))
request_tries[req] = 0
if not request_tries:
@@ -169,13 +170,13 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
error_cls = response.__class__
orig_req = response.payload
- elif isinstance(response, ProduceResponse) and response.error:
+ elif isinstance(response, ProduceResponsePayload) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)
- log.error('%s sending ProduceRequest (#%d of %d) '
+ log.error('%s sending ProduceRequestPayload (#%d of %d) '
'to %s:%d with msgs %s',
error_cls.__name__, (i + 1), len(requests),
orig_req.topic, orig_req.partition,
@@ -196,8 +197,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
log.warn('Async producer forcing metadata refresh metadata before retrying')
try:
client.load_metadata_for_topics()
- except Exception as e:
- log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message)
+ except Exception:
+ log.exception("Async producer couldn't reload topic metadata.")
# Apply retry limit, dropping messages that are over
request_tries = dict(
@@ -210,7 +211,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Log messages we are going to retry
for orig_req in request_tries.keys():
- log.info('Retrying ProduceRequest to %s:%d with msgs %s',
+ log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s',
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
@@ -225,9 +226,9 @@ class Producer(object):
Base class to be used by producers
Arguments:
- client (KafkaClient): instance to use for broker communications.
- If async=True, the background thread will use client.copy(),
- which is expected to return a thread-safe object.
+ client (kafka.SimpleClient): instance to use for broker
+ communications. If async=True, the background thread will use
+ client.copy(), which is expected to return a thread-safe object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
@@ -345,22 +346,37 @@ class Producer(object):
self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
+ """Helper method to send produce requests.
+
+ Note that msg type *must* be encoded to bytes by user. Passing unicode
+ message will not work, for example you should encode before calling
+ send_messages via something like `unicode_message.encode('utf-8')`
+ All messages will set the message 'key' to None.
+
+ Arguments:
+ topic (str): name of topic for produce request
+ partition (int): partition number for produce request
+ *msg (bytes): one or more message payloads
+
+ Returns:
+ ResponseRequest returned by server
+
+ Raises:
+ FailedPayloadsError: low-level connection error, can be caused by
+ networking failures, or a malformed request.
+ ConnectionError:
+ KafkaUnavailableError: all known brokers are down when attempting
+ to refresh metadata.
+ LeaderNotAvailableError: topic or partition is initializing or
+ a broker failed and leadership election is in progress.
+ NotLeaderForPartitionError: metadata is out of sync; the broker
+ that the request was sent to is not the leader for the topic
+ or partition.
+ UnknownTopicOrPartitionError: the topic or partition has not
+ been created yet and auto-creation is not available.
+ AsyncProducerQueueFull: in async mode, if too many messages are
+ unsent and remain in the internal queue.
"""
- Helper method to send produce requests
- @param: topic, name of topic for produce request -- type str
- @param: partition, partition number for produce request -- type int
- @param: *msg, one or more message payloads -- type bytes
- @returns: ResponseRequest returned by server
- raises on error
-
- Note that msg type *must* be encoded to bytes by user.
- Passing unicode message will not work, for example
- you should encode before calling send_messages via
- something like `unicode_message.encode('utf-8')`
-
- All messages produced via this method will set the message 'key' to Null
- """
- topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg)
def _send_messages(self, topic, partition, *msg, **kwargs):
@@ -380,10 +396,6 @@ class Producer(object):
elif not isinstance(m, six.binary_type):
raise TypeError("all produce message payloads must be null or type bytes")
- # Raise TypeError if topic is not encoded as bytes
- if not isinstance(topic, six.binary_type):
- raise TypeError("the topic must be type bytes")
-
# Raise TypeError if the key is not encoded as bytes
if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes")
@@ -391,7 +403,7 @@ class Producer(object):
if self.async:
for idx, m in enumerate(msg):
try:
- item = (TopicAndPartition(topic, partition), m, key)
+ item = (TopicPartition(topic, partition), m, key)
if self.async_queue_put_timeout == 0:
self.queue.put_nowait(item)
else:
@@ -404,7 +416,7 @@ class Producer(object):
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
- req = ProduceRequest(topic, partition, messages)
+ req = ProduceRequestPayload(topic, partition, messages)
try:
resp = self.client.send_produce_request(
[req], acks=self.req_acks, timeout=self.ack_timeout,
@@ -449,7 +461,8 @@ class Producer(object):
# ValueError on list.remove() if the exithandler no longer exists
# but that is fine here
try:
- atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
+ atexit._exithandlers.remove( # pylint: disable=no-member
+ (self._cleanup_func, (self,), {}))
except ValueError:
pass
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index a5a26c9..f35aef0 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -5,7 +5,6 @@ import warnings
from .base import Producer
from ..partitioner import HashedPartitioner
-from ..util import kafka_bytestring
log = logging.getLogger(__name__)
@@ -38,7 +37,6 @@ class KeyedProducer(Producer):
return partitioner.partition(key)
def send_messages(self, topic, key, *msg):
- topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg, key=key)
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 78d5a4d..1406be6 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -46,9 +46,6 @@ class SimpleProducer(Producer):
return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg):
- if not isinstance(topic, six.binary_type):
- topic = topic.encode('utf-8')
-
partition = self._next_partition(topic)
return super(SimpleProducer, self).send_messages(
topic, partition, *msg