summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py29
1 files changed, 17 insertions, 12 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index c855d05..37145b7 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -133,21 +133,26 @@ class KafkaProtocol(object):
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
+ topics = []
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
+ topic_msgs = []
+ for partition, payload in topic_payloads.items():
+ partition_msgs = []
+ for msg in payload.messages:
+ m = kafka.protocol.message.Message(
+ msg.value, key=msg.key,
+ magic=msg.magic, attributes=msg.attributes
+ )
+ partition_msgs.append((0, m.encode()))
+ topic_msgs.append((partition, partition_msgs))
+ topics.append((topic, topic_msgs))
+
+
return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
- topics=[(
- topic,
- [(
- partition,
- [(0,
- kafka.protocol.message.Message(
- msg.value, key=msg.key,
- magic=msg.magic, attributes=msg.attributes
- ).encode())
- for msg in payload.messages])
- for partition, payload in topic_payloads.items()])
- for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+ topics=topics
+ )
@classmethod
def decode_produce_response(cls, response):