summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-18 21:49:37 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-18 21:49:37 -0800
commit799f53f71275aec7a32c2935837f7c8f3d6283c0 (patch)
treeee62554485cf3e2fef15dd8fe6cbaf0215727d6e /kafka/producer/buffer.py
parent72fa7ef4fdb5be215aab7a075ad2257acbb059aa (diff)
downloadkafka-python-799f53f71275aec7a32c2935837f7c8f3d6283c0.tar.gz
Fix bug in SimpleBufferPool memory condition waiting / timeout
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py9
1 files changed, 5 insertions, 4 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index a95bb87..74ba5da 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -158,15 +158,16 @@ class SimpleBufferPool(object):
# enough memory to allocate one
while buf is None:
start_wait = time.time()
- if not more_memory.wait(max_time_to_block_ms / 1000.0):
- raise Errors.KafkaTimeoutError(
- "Failed to allocate memory within the configured"
- " max blocking time")
+ more_memory.wait(max_time_to_block_ms / 1000.0)
end_wait = time.time()
#this.waitTime.record(endWait - startWait, time.milliseconds());
if self._free:
buf = self._free.popleft()
+ else:
+ raise Errors.KafkaTimeoutError(
+ "Failed to allocate memory within the configured"
+ " max blocking time")
# remove the condition for this thread to let the next thread
# in line start getting memory