diff options
Diffstat (limited to 'kafka/protocol/produce.py')
-rw-r--r-- | kafka/protocol/produce.py | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py new file mode 100644 index 0000000..b875397 --- /dev/null +++ b/kafka/protocol/produce.py @@ -0,0 +1,59 @@ +from .api import AbstractRequest, AbstractResponse, MessageSet +from .types import Int8, Int16, Int32, Int64, Bytes, String, Array + + +class ProduceRequest(AbstractRequest): + API_KEY = 0 + API_VERSION = 0 + __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression') + + def __init__(self, topic_partition_messages, + required_acks=-1, timeout=1000, compression=None): + """ + topic_partition_messages is a dict of dicts of lists (of messages) + { + "TopicFoo": { + 0: [ + Message('foo'), + Message('bar') + ], + 1: [ + Message('fizz'), + Message('buzz') + ] + } + } + """ + self.required_acks = required_acks + self.timeout = timeout + self.topic_partition_messages = topic_partition_messages + self.compression = compression + + @staticmethod + def _encode_messages(partition, messages, compression): + message_set = MessageSet.encode(messages) + + if compression: + # compress message_set data and re-encode as single message + # then wrap single compressed message in a new message_set + pass + + return (Int32.encode(partition) + + Int32.encode(len(message_set)) + + message_set) + + def encode(self): + request = ( + Int16.encode(self.required_acks) + + Int32.encode(self.timeout) + + Array.encode([( + String.encode(topic) + + Array.encode([ + self._encode_messages(partition, messages, self.compression) + for partition, messages in partitions.iteritems()]) + ) for topic, partitions in self.topic_partition_messages.iteritems()]) + ) + return super(ProduceRequest, self).encode(request) + + + |