summaryrefslogtreecommitdiff
path: root/kafka/protocol/produce.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-11-23 05:26:13 +0800
committerZack Dever <zack.dever@rd.io>2015-12-04 11:25:39 -0800
commit3f65ff4ab93f2282af442e6bb5e54e3af1d602db (patch)
treea800a231d7ad0a0ace40db2d59b7d6fe7a42ab38 /kafka/protocol/produce.py
parenta0be374ce36f00ebb11a1e211ecee715999d9e8b (diff)
downloadkafka-python-3f65ff4ab93f2282af442e6bb5e54e3af1d602db.tar.gz
Move ProduceRequest to kafka.protocol.produce
Diffstat (limited to 'kafka/protocol/produce.py')
-rw-r--r--kafka/protocol/produce.py59
1 files changed, 59 insertions, 0 deletions
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
new file mode 100644
index 0000000..b875397
--- /dev/null
+++ b/kafka/protocol/produce.py
@@ -0,0 +1,59 @@
+from .api import AbstractRequest, AbstractResponse, MessageSet
+from .types import Int8, Int16, Int32, Int64, Bytes, String, Array
+
+
+class ProduceRequest(AbstractRequest):
+ API_KEY = 0
+ API_VERSION = 0
+ __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression')
+
+ def __init__(self, topic_partition_messages,
+ required_acks=-1, timeout=1000, compression=None):
+ """
+ topic_partition_messages is a dict of dicts of lists (of messages)
+ {
+ "TopicFoo": {
+ 0: [
+ Message('foo'),
+ Message('bar')
+ ],
+ 1: [
+ Message('fizz'),
+ Message('buzz')
+ ]
+ }
+ }
+ """
+ self.required_acks = required_acks
+ self.timeout = timeout
+ self.topic_partition_messages = topic_partition_messages
+ self.compression = compression
+
+ @staticmethod
+ def _encode_messages(partition, messages, compression):
+ message_set = MessageSet.encode(messages)
+
+ if compression:
+ # compress message_set data and re-encode as single message
+ # then wrap single compressed message in a new message_set
+ pass
+
+ return (Int32.encode(partition) +
+ Int32.encode(len(message_set)) +
+ message_set)
+
+ def encode(self):
+ request = (
+ Int16.encode(self.required_acks) +
+ Int32.encode(self.timeout) +
+ Array.encode([(
+ String.encode(topic) +
+ Array.encode([
+ self._encode_messages(partition, messages, self.compression)
+ for partition, messages in partitions.iteritems()])
+ ) for topic, partitions in self.topic_partition_messages.iteritems()])
+ )
+ return super(ProduceRequest, self).encode(request)
+
+
+