summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Baryshnikov <mephius@gmail.com>2017-02-07 22:31:06 +0300
committerDana Powers <dana.powers@gmail.com>2017-03-07 13:31:46 -0800
commit5a0e9715f45b62cfe43e6873b8828f49ab73f710 (patch)
tree8c6c47fdc7997b7fcefa6a1a92386a40d6ab9773
parent82d50f443e04356b2f051f7476bb4b4f5bd700d2 (diff)
downloadkafka-python-5a0e9715f45b62cfe43e6873b8828f49ab73f710.tar.gz
Fixed couple of "leaks" when gc is disabled (#979)
-rw-r--r--kafka/protocol/legacy.py29
-rw-r--r--kafka/protocol/message.py4
-rw-r--r--kafka/protocol/struct.py6
-rw-r--r--kafka/vendor/six.py4
4 files changed, 27 insertions, 16 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index c855d05..37145b7 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -133,21 +133,26 @@ class KafkaProtocol(object):
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
+ topics = []
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
+ topic_msgs = []
+ for partition, payload in topic_payloads.items():
+ partition_msgs = []
+ for msg in payload.messages:
+ m = kafka.protocol.message.Message(
+ msg.value, key=msg.key,
+ magic=msg.magic, attributes=msg.attributes
+ )
+ partition_msgs.append((0, m.encode()))
+ topic_msgs.append((partition, partition_msgs))
+ topics.append((topic, topic_msgs))
+
+
return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
- topics=[(
- topic,
- [(
- partition,
- [(0,
- kafka.protocol.message.Message(
- msg.value, key=msg.key,
- magic=msg.magic, attributes=msg.attributes
- ).encode())
- for msg in payload.messages])
- for partition, payload in topic_payloads.items()])
- for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+ topics=topics
+ )
@classmethod
def decode_produce_response(cls, response):
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index bfad127..ec5ee6c 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -10,7 +10,7 @@ from .struct import Struct
from .types import (
Int8, Int32, Int64, Bytes, Schema, AbstractType
)
-from ..util import crc32
+from ..util import crc32, WeakMethod
class Message(Struct):
@@ -52,7 +52,7 @@ class Message(Struct):
self.attributes = attributes
self.key = key
self.value = value
- self.encode = self._encode_self
+ self.encode = WeakMethod(self._encode_self)
@property
def timestamp_type(self):
diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py
index a3d28d7..4c1afcb 100644
--- a/kafka/protocol/struct.py
+++ b/kafka/protocol/struct.py
@@ -5,6 +5,8 @@ from io import BytesIO
from .abstract import AbstractType
from .types import Schema
+from ..util import WeakMethod
+
class Struct(AbstractType):
SCHEMA = Schema()
@@ -19,7 +21,9 @@ class Struct(AbstractType):
self.__dict__.update(kwargs)
# overloading encode() to support both class and instance
- self.encode = self._encode_self
+ # Without WeakMethod() this creates circular ref, which
+ # causes instances to "leak" to garbage
+ self.encode = WeakMethod(self._encode_self)
@classmethod
def encode(cls, item): # pylint: disable=E0202
diff --git a/kafka/vendor/six.py b/kafka/vendor/six.py
index 808e651..a949b95 100644
--- a/kafka/vendor/six.py
+++ b/kafka/vendor/six.py
@@ -70,7 +70,9 @@ else:
else:
# 64-bit
MAXSIZE = int((1 << 63) - 1)
- del X
+
+ # Don't del it here, cause with gc disabled this "leaks" to garbage
+ # del X
def _add_doc(func, doc):