summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-05 14:45:44 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:56 -0700
commitc76bc9dd179044811a5e0d3fde2e437f1ee6d46c (patch)
treea11d6b378dba07947cdddbae785e0173dccf681f /kafka
parentaa217e05448b4eced017b5ecdcb020a4411f863f (diff)
downloadkafka-python-c76bc9dd179044811a5e0d3fde2e437f1ee6d46c.tar.gz
Update Producer class docstring
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/base.py29
1 files changed, 25 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 9f4942b..a0bf47c 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -44,10 +44,31 @@ STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
- """
- Listen on the queue for a specified number of messages or till
- a specified timeout and send them upstream to the brokers in one
- request
+ """Private method to manage producing messages asynchronously
+
+ Listens on the queue for a specified number of messages or until
+ a specified timeout and then sends messages to the brokers in grouped
+ requests (one per broker).
+
+ Messages placed on the queue should be tuples that conform to this format:
+ ((topic, partition), message, key)
+
+ Currently does not mark messages with task_done. Do not attempt to join()!
+
+ Arguments:
+ queue (threading.Queue): the queue from which to get messages
+ client (KafkaClient): instance to use for communicating with brokers
+ codec (kafka.protocol.ALL_CODECS): compression codec to use
+ batch_time (int): interval in seconds to send message batches
+ batch_size (int): count of messages that will trigger an immediate send
+ req_acks: required acks to use with ProduceRequests. see server protocol
+ ack_timeout: timeout to wait for required acks. see server protocol
+ retry_options (RetryOptions): settings for retry limits, backoff etc
+ stop_event (threading.Event): event to monitor for shutdown signal.
+ when this event is 'set', the producer will stop sending messages.
+ log_messages_on_error (bool, optional): log stringified message-contents
+ on any produce error, otherwise only log a hash() of the contents,
+ defaults to True.
"""
request_tries = {}
client.reinit()