diff options
author | Dana Powers <dana.powers@rd.io> | 2015-11-23 05:26:13 +0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:39 -0800 |
commit | 3f65ff4ab93f2282af442e6bb5e54e3af1d602db (patch) | |
tree | a800a231d7ad0a0ace40db2d59b7d6fe7a42ab38 /kafka/protocol/api.py | |
parent | a0be374ce36f00ebb11a1e211ecee715999d9e8b (diff) | |
download | kafka-python-3f65ff4ab93f2282af442e6bb5e54e3af1d602db.tar.gz |
Move ProduceRequest to kafka.protocol.produce
Diffstat (limited to 'kafka/protocol/api.py')
-rw-r--r-- | kafka/protocol/api.py | 54 |
1 files changed, 0 insertions, 54 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index cbaf828..8ea820b 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -58,60 +58,6 @@ class AbstractRequest(AbstractRequestResponse): return super(AbstractRequest, cls).encode(request) -class ProduceRequest(AbstractRequest): - API_KEY = 0 - API_VERSION = 0 - __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression') - - def __init__(self, topic_partition_messages, - required_acks=-1, timeout=1000, compression=None): - """ - topic_partition_messages is a dict of dicts of lists (of messages) - { - "TopicFoo": { - 0: [ - Message('foo'), - Message('bar') - ], - 1: [ - Message('fizz'), - Message('buzz') - ] - } - } - """ - self.required_acks = required_acks - self.timeout = timeout - self.topic_partition_messages = topic_partition_messages - self.compression = compression - - @staticmethod - def _encode_messages(partition, messages, compression): - message_set = MessageSet.encode(messages) - - if compression: - # compress message_set data and re-encode as single message - # then wrap single compressed message in a new message_set - pass - - return (Int32.encode(partition) + - Int32.encode(len(message_set)) + - message_set) - - def encode(self): - request = ( - Int16.encode(self.required_acks) + - Int32.encode(self.timeout) + - Array.encode([( - String.encode(topic) + - Array.encode([ - self._encode_messages(partition, messages, self.compression) - for partition, messages in partitions.iteritems()]) - ) for topic, partitions in self.topic_partition_messages.iteritems()]) - ) - return super(ProduceRequest, self).encode(request) - - class FetchRequest(AbstractRequest): API_KEY = 1 API_VERSION = 0 |