summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py53
1 files changed, 35 insertions, 18 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 4f5edbc..506da83 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -61,7 +61,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
Arguments:
queue (threading.Queue): the queue from which to get messages
- client (KafkaClient): instance to use for communicating with brokers
+ client (kafka.SimpleClient): instance to use for communicating
+ with brokers
codec (kafka.protocol.ALL_CODECS): compression codec to use
batch_time (int): interval in seconds to send message batches
batch_size (int): count of messages that will trigger an immediate send
@@ -225,9 +226,9 @@ class Producer(object):
Base class to be used by producers
Arguments:
- client (KafkaClient): instance to use for broker communications.
- If async=True, the background thread will use client.copy(),
- which is expected to return a thread-safe object.
+ client (kafka.SimpleClient): instance to use for broker
+ communications. If async=True, the background thread will use
+ client.copy(), which is expected to return a thread-safe object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
@@ -345,20 +346,36 @@ class Producer(object):
self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
- """
- Helper method to send produce requests
- @param: topic, name of topic for produce request -- type str
- @param: partition, partition number for produce request -- type int
- @param: *msg, one or more message payloads -- type bytes
- @returns: ResponseRequest returned by server
- raises on error
-
- Note that msg type *must* be encoded to bytes by user.
- Passing unicode message will not work, for example
- you should encode before calling send_messages via
- something like `unicode_message.encode('utf-8')`
-
- All messages produced via this method will set the message 'key' to Null
+ """Helper method to send produce requests.
+
+ Note that msg type *must* be encoded to bytes by user. Passing unicode
+ message will not work, for example you should encode before calling
+ send_messages via something like `unicode_message.encode('utf-8')`
+ All messages will set the message 'key' to None.
+
+ Arguments:
+ topic (str): name of topic for produce request
+ partition (int): partition number for produce request
+ *msg (bytes): one or more message payloads
+
+ Returns:
+ ResponseRequest returned by server
+
+ Raises:
+ FailedPayloadsError: low-level connection error, can be caused by
+ networking failures, or a malformed request.
+ ConnectionError:
+ KafkaUnavailableError: all known brokers are down when attempting
+ to refresh metadata.
+ LeaderNotAvailableError: topic or partition is initializing or
+ a broker failed and leadership election is in progress.
+ NotLeaderForPartitionError: metadata is out of sync; the broker
+ that the request was sent to is not the leader for the topic
+ or partition.
+ UnknownTopicOrPartitionError: the topic or partition has not
+ been created yet and auto-creation is not available.
+ AsyncProducerQueueFull: in async mode, if too many messages are
+ unsent and remain in the internal queue.
"""
return self._send_messages(topic, partition, *msg)