summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
authorMax Baryshnikov <mephius@gmail.com>2017-02-07 22:31:06 +0300
committerDana Powers <dana.powers@gmail.com>2017-03-07 13:31:46 -0800
commit5a0e9715f45b62cfe43e6873b8828f49ab73f710 (patch)
tree8c6c47fdc7997b7fcefa6a1a92386a40d6ab9773 /kafka/protocol/legacy.py
parent82d50f443e04356b2f051f7476bb4b4f5bd700d2 (diff)
downloadkafka-python-5a0e9715f45b62cfe43e6873b8828f49ab73f710.tar.gz
Fixed couple of "leaks" when gc is disabled (#979)
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py29
1 files changed, 17 insertions, 12 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index c855d05..37145b7 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -133,21 +133,26 @@ class KafkaProtocol(object):
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
+ topics = []
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
+ topic_msgs = []
+ for partition, payload in topic_payloads.items():
+ partition_msgs = []
+ for msg in payload.messages:
+ m = kafka.protocol.message.Message(
+ msg.value, key=msg.key,
+ magic=msg.magic, attributes=msg.attributes
+ )
+ partition_msgs.append((0, m.encode()))
+ topic_msgs.append((partition, partition_msgs))
+ topics.append((topic, topic_msgs))
+
+
return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
- topics=[(
- topic,
- [(
- partition,
- [(0,
- kafka.protocol.message.Message(
- msg.value, key=msg.key,
- magic=msg.magic, attributes=msg.attributes
- ).encode())
- for msg in payload.messages])
- for partition, payload in topic_payloads.items()])
- for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+ topics=topics
+ )
@classmethod
def decode_produce_response(cls, response):