diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-17 22:43:21 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-17 22:43:21 -0800 |
commit | 97fd705a234fae1d4252e02a47ab0b6b70fde12b (patch) | |
tree | 3ea4aad2fd4bc0e54e0bf7b4165bb0f00e3ad6a2 /kafka | |
parent | d5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff) | |
download | kafka-python-batch_size_zero.tar.gz |
Support batch_size = 0 in producer buffersbatch_size_zero
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer/buffer.py | 10 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 2 |
2 files changed, 7 insertions, 5 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 1a2dd71..a95bb87 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -30,8 +30,6 @@ class MessageSetBuffer(object): 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), } def __init__(self, buf, batch_size, compression_type=None): - assert batch_size > 0, 'batch_size must be > 0' - if compression_type is not None: assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' checker, encoder, attributes = self._COMPRESSORS[compression_type] @@ -121,7 +119,7 @@ class SimpleBufferPool(object): self._poolable_size = poolable_size self._lock = threading.RLock() - buffers = int(memory / poolable_size) + buffers = int(memory / poolable_size) if poolable_size else 0 self._free = collections.deque([io.BytesIO() for _ in range(buffers)]) self._waiters = collections.deque() @@ -130,12 +128,13 @@ class SimpleBufferPool(object): #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); - def allocate(self, max_time_to_block_ms): + def allocate(self, size, max_time_to_block_ms): """ Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool is configured with blocking mode. Arguments: + size (int): The buffer size to allocate in bytes [ignored] max_time_to_block_ms (int): The maximum time in milliseconds to block for buffer memory to be available @@ -147,6 +146,9 @@ class SimpleBufferPool(object): if self._free: return self._free.popleft() + elif self._poolable_size == 0: + return io.BytesIO() + else: # we are out of buffers and will have to block buf = None diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index c62926d..1e692ee 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -200,7 +200,7 @@ class RecordAccumulator(object): size = max(self.config['batch_size'], message_size) log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace - buf = self._free.allocate(max_time_to_block_ms) + buf = self._free.allocate(size, max_time_to_block_ms) with self._tp_locks[tp]: # Need to check if producer is closed again after grabbing the # dequeue lock. |