diff options
| author | Dana Powers <dana.powers@gmail.com> | 2016-02-17 22:43:21 -0800 | 
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2016-02-17 22:43:21 -0800 | 
| commit | 97fd705a234fae1d4252e02a47ab0b6b70fde12b (patch) | |
| tree | 3ea4aad2fd4bc0e54e0bf7b4165bb0f00e3ad6a2 /kafka/producer/buffer.py | |
| parent | d5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff) | |
| download | kafka-python-97fd705a234fae1d4252e02a47ab0b6b70fde12b.tar.gz | |
Support batch_size = 0 in producer buffersbatch_size_zero
Diffstat (limited to 'kafka/producer/buffer.py')
| -rw-r--r-- | kafka/producer/buffer.py | 10 | 
1 files changed, 6 insertions, 4 deletions
| diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 1a2dd71..a95bb87 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -30,8 +30,6 @@ class MessageSetBuffer(object):          'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),      }      def __init__(self, buf, batch_size, compression_type=None): -        assert batch_size > 0, 'batch_size must be > 0' -          if compression_type is not None:              assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'              checker, encoder, attributes = self._COMPRESSORS[compression_type] @@ -121,7 +119,7 @@ class SimpleBufferPool(object):          self._poolable_size = poolable_size          self._lock = threading.RLock() -        buffers = int(memory / poolable_size) +        buffers = int(memory / poolable_size) if poolable_size else 0          self._free = collections.deque([io.BytesIO() for _ in range(buffers)])          self._waiters = collections.deque() @@ -130,12 +128,13 @@ class SimpleBufferPool(object):          #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");          #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); -    def allocate(self, max_time_to_block_ms): +    def allocate(self, size, max_time_to_block_ms):          """          Allocate a buffer of the given size. This method blocks if there is not          enough memory and the buffer pool is configured with blocking mode.          Arguments: +            size (int): The buffer size to allocate in bytes [ignored]              max_time_to_block_ms (int): The maximum time in milliseconds to                  block for buffer memory to be available @@ -147,6 +146,9 @@ class SimpleBufferPool(object):              if self._free:                  return self._free.popleft() +            elif self._poolable_size == 0: +                return io.BytesIO() +              else:                  # we are out of buffers and will have to block                  buf = None | 
