summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-08-26 17:13:47 -0700
committerDana Powers <dana.powers@rd.io>2014-08-26 19:24:33 -0700
commita94005486ff7329aacbe8f89524de4b237ab0103 (patch)
tree945c04ad90d0a6dd8e2630f7b3c8105d930d898f
parent26042ae7f53fbcfd4ccdd6c4d5b11c54ba04f312 (diff)
downloadkafka-python-a94005486ff7329aacbe8f89524de4b237ab0103.tar.gz
Add warnings to README, docstring, and logging that async producer does not retry failed messages
-rw-r--r--README.md4
-rw-r--r--kafka/producer.py6
2 files changed, 9 insertions, 1 deletions
diff --git a/README.md b/README.md
index e176bad..1c0e343 100644
--- a/README.md
+++ b/README.md
@@ -51,6 +51,8 @@ producer.send_messages("my-topic", "this method", "is variadic")
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
# To send messages asynchronously
+# WARNING: current implementation does not guarantee message delivery on failure!
+# messages can get dropped! Use at your own risk! Or help us improve with a PR!
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my-topic", "async message")
@@ -63,7 +65,7 @@ producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
-response = producer.send_messages("my-topic", "async message")
+response = producer.send_messages("my-topic", "another message")
if response:
print(response[0].error)
diff --git a/kafka/producer.py b/kafka/producer.py
index 800e677..8a6bff0 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -87,6 +87,9 @@ class Producer(object):
client - The Kafka client instance to use
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
+ WARNING!!! current implementation of async producer does not
+ guarantee message delivery. Use at your own risk! Or help us
+ improve with a PR!
req_acks - A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
@@ -131,6 +134,9 @@ class Producer(object):
self.codec = codec
if self.async:
+ log.warning("async producer does not guarantee message delivery!")
+ log.warning("Current implementation does not retry Failed messages")
+ log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
args=(self.queue,