diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-05-05 22:07:50 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-05-06 11:49:04 +0200 |
| commit | e0066cac5c443464a36b3eb4f68b9fd8fb860e38 (patch) | |
| tree | 06f3a5947a7c5ee048ac92e01ccc6e1362d94da8 | |
| parent | c29a33fd39822623ef5743439ffbe1f393bff3d3 (diff) | |
| download | rabbitmq-server-git-e0066cac5c443464a36b3eb4f68b9fd8fb860e38.tar.gz | |
File handle cache: Pay attention to memory use
... and clear read cache if necessary.
This solves an issue where sync'ing a mirrored queue could take all
available memory (way above the high watermark) and possibly crash the
node.
Fixes #134.
| -rw-r--r-- | src/file_handle_cache.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 21 |
2 files changed, 47 insertions, 6 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d0fd524fb8..cd7ba6d389 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -353,6 +353,7 @@ read(Ref, Count) -> read_buffer_rem = BufRem - Count, read_buffer_usage = BufUsg + Count }]}; ([Handle0]) -> + maybe_reduce_read_cache([Ref]), Handle = #handle{read_buffer = Buf, read_buffer_pos = BufPos, read_buffer_rem = BufRem, @@ -962,6 +963,37 @@ tune_read_buffer_limit(Handle = #handle{read_buffer = Buf, false -> Usg * 2 end, Lim)}. +maybe_reduce_read_cache(SparedRefs) -> + case rabbit_memory_monitor:memory_use(bytes) of + {_, infinity} -> ok; + {MemUse, MemLimit} when MemUse < MemLimit -> ok; + {MemUse, MemLimit} -> reduce_read_cache( + (MemUse - MemLimit) * 2, + SparedRefs) + end. + +reduce_read_cache(MemToFree, SparedRefs) -> + Handles = lists:sort( + fun({_, H1}, {_, H2}) -> H1 < H2 end, + [{R, H} || {{R, fhc_handle}, H} <- get(), + not lists:member(R, SparedRefs) + andalso size(H#handle.read_buffer) > 0]), + FreedMem = lists:foldl( + fun + (_, Freed) when Freed >= MemToFree -> + Freed; + ({Ref, #handle{read_buffer = Buf} = Handle}, Freed) -> + Handle1 = reset_read_buffer(Handle), + put({Ref, fhc_handle}, Handle1), + Freed + size(Buf) + end, 0, Handles), + if + FreedMem < MemToFree andalso SparedRefs =/= [] -> + reduce_read_cache(MemToFree - FreedMem, []); + true -> + ok + end. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 451ee1f443..049e65ba4c 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -25,7 +25,7 @@ -behaviour(gen_server2). -export([start_link/0, register/2, deregister/1, - report_ram_duration/2, stop/0, conserve_resources/3]). + report_ram_duration/2, stop/0, conserve_resources/3, memory_use/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -92,6 +92,19 @@ conserve_resources(Pid, disk, Conserve) -> conserve_resources(_Pid, _Source, _Conserve) -> ok. +memory_use(bytes) -> + MemoryLimit = vm_memory_monitor:get_memory_limit(), + {erlang:memory(total), case MemoryLimit > 0.0 of + true -> MemoryLimit; + false -> infinity + end}; +memory_use(ratio) -> + MemoryLimit = vm_memory_monitor:get_memory_limit(), + case MemoryLimit > 0.0 of + true -> erlang:memory(total) / MemoryLimit; + false -> infinity + end. + %%---------------------------------------------------------------------------- %% Gen_server callbacks %%---------------------------------------------------------------------------- @@ -223,11 +236,7 @@ desired_duration_average(#state{disk_alarm = false, queue_duration_count = Count}) -> {ok, LimitThreshold} = application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), - MemoryLimit = vm_memory_monitor:get_memory_limit(), - MemoryRatio = case MemoryLimit > 0.0 of - true -> erlang:memory(total) / MemoryLimit; - false -> infinity - end, + MemoryRatio = memory_use(ratio), if MemoryRatio =:= infinity -> 0.0; MemoryRatio < LimitThreshold orelse Count == 0 -> |
