diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:49:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:49:37 -0800 |
commit | 799f53f71275aec7a32c2935837f7c8f3d6283c0 (patch) | |
tree | ee62554485cf3e2fef15dd8fe6cbaf0215727d6e /kafka/producer/buffer.py | |
parent | 72fa7ef4fdb5be215aab7a075ad2257acbb059aa (diff) | |
download | kafka-python-799f53f71275aec7a32c2935837f7c8f3d6283c0.tar.gz |
Fix bug in SimpleBufferPool memory condition waiting / timeout
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r-- | kafka/producer/buffer.py | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index a95bb87..74ba5da 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -158,15 +158,16 @@ class SimpleBufferPool(object): # enough memory to allocate one while buf is None: start_wait = time.time() - if not more_memory.wait(max_time_to_block_ms / 1000.0): - raise Errors.KafkaTimeoutError( - "Failed to allocate memory within the configured" - " max blocking time") + more_memory.wait(max_time_to_block_ms / 1000.0) end_wait = time.time() #this.waitTime.record(endWait - startWait, time.milliseconds()); if self._free: buf = self._free.popleft() + else: + raise Errors.KafkaTimeoutError( + "Failed to allocate memory within the configured" + " max blocking time") # remove the condition for this thread to let the next thread # in line start getting memory |