summaryrefslogtreecommitdiff
path: root/kafka/protocol/produce.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/produce.py')
-rw-r--r--kafka/protocol/produce.py81
1 files changed, 26 insertions, 55 deletions
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index b875397..532a702 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -1,59 +1,30 @@
-from .api import AbstractRequest, AbstractResponse, MessageSet
-from .types import Int8, Int16, Int32, Int64, Bytes, String, Array
+from .message import MessageSet
+from .struct import Struct
+from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema
-class ProduceRequest(AbstractRequest):
+class ProduceRequest(Struct):
API_KEY = 0
API_VERSION = 0
- __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression')
-
- def __init__(self, topic_partition_messages,
- required_acks=-1, timeout=1000, compression=None):
- """
- topic_partition_messages is a dict of dicts of lists (of messages)
- {
- "TopicFoo": {
- 0: [
- Message('foo'),
- Message('bar')
- ],
- 1: [
- Message('fizz'),
- Message('buzz')
- ]
- }
- }
- """
- self.required_acks = required_acks
- self.timeout = timeout
- self.topic_partition_messages = topic_partition_messages
- self.compression = compression
-
- @staticmethod
- def _encode_messages(partition, messages, compression):
- message_set = MessageSet.encode(messages)
-
- if compression:
- # compress message_set data and re-encode as single message
- # then wrap single compressed message in a new message_set
- pass
-
- return (Int32.encode(partition) +
- Int32.encode(len(message_set)) +
- message_set)
-
- def encode(self):
- request = (
- Int16.encode(self.required_acks) +
- Int32.encode(self.timeout) +
- Array.encode([(
- String.encode(topic) +
- Array.encode([
- self._encode_messages(partition, messages, self.compression)
- for partition, messages in partitions.iteritems()])
- ) for topic, partitions in self.topic_partition_messages.iteritems()])
- )
- return super(ProduceRequest, self).encode(request)
-
-
-
+ SCHEMA = Schema(
+ ('required_acks', Int16),
+ ('timeout', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('messages', MessageSet)))))
+ )
+
+
+class ProduceResponse(Struct):
+ API_KEY = 0
+ API_VERSION = 0
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offset', Int64)))))
+ )