diff options
| author | Dana Powers <dana.powers@gmail.com> | 2016-07-10 21:36:41 -0700 | 
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 14:05:49 -0700 | 
| commit | c34d13879641d27cceb9403a4e6617152dfda0f3 (patch) | |
| tree | 5e7208f13a59352097829fda76cf007e9aec2d53 /kafka/producer/buffer.py | |
| parent | 20f4c95289c694f81a60228a9820601eb57402f4 (diff) | |
| download | kafka-python-c34d13879641d27cceb9403a4e6617152dfda0f3.tar.gz | |
Add initial producer-sender metrics
Diffstat (limited to 'kafka/producer/buffer.py')
| -rw-r--r-- | kafka/producer/buffer.py | 20 | 
1 files changed, 15 insertions, 5 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5fcb35f..de5f0e7 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division  import collections  import io @@ -55,6 +55,8 @@ class MessageSetBuffer(object):          self._batch_size = batch_size          self._closed = False          self._messages = 0 +        self._bytes_written = 4 # Int32 header is 4 bytes +        self._final_size = None      def append(self, offset, message):          """Apend a Message to the MessageSet. @@ -62,6 +64,8 @@ class MessageSetBuffer(object):          Arguments:              offset (int): offset of the message              message (Message or bytes): message struct or encoded bytes + +        Returns: bytes written          """          if isinstance(message, Message):              encoded = message.encode() @@ -70,6 +74,8 @@ class MessageSetBuffer(object):          msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded          self._buffer.write(msg)          self._messages += 1 +        self._bytes_written += len(msg) +        return len(msg)      def has_room_for(self, key, value):          if self._closed: @@ -107,16 +113,20 @@ class MessageSetBuffer(object):                  self._buffer.write(Int32.encode(len(encoded)))                  self._buffer.write(encoded) -            # Update the message set size, and return ready for full read() -            size = self._buffer.tell() - 4 +            # Update the message set size (less the 4 byte header), +            # and return with buffer ready for full read() +            self._final_size = self._buffer.tell()              self._buffer.seek(0) -            self._buffer.write(Int32.encode(size)) +            self._buffer.write(Int32.encode(self._final_size - 4))          self._buffer.seek(0)          self._closed = True      def size_in_bytes(self): -        return self._buffer.tell() +        return self._final_size or self._buffer.tell() + +    def compression_rate(self): +        return self.size_in_bytes() / self._bytes_written      def buffer(self):          return self._buffer  | 
