summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-05-05 22:07:50 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-05-06 11:49:04 +0200
commite0066cac5c443464a36b3eb4f68b9fd8fb860e38 (patch)
tree06f3a5947a7c5ee048ac92e01ccc6e1362d94da8 /src
parentc29a33fd39822623ef5743439ffbe1f393bff3d3 (diff)
downloadrabbitmq-server-git-e0066cac5c443464a36b3eb4f68b9fd8fb860e38.tar.gz
File handle cache: Pay attention to memory use
... and clear read cache if necessary. This solves an issue where sync'ing a mirrored queue could take all available memory (way above the high watermark) and possibly crash the node. Fixes #134.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl32
-rw-r--r--src/rabbit_memory_monitor.erl21
2 files changed, 47 insertions, 6 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d0fd524fb8..cd7ba6d389 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -353,6 +353,7 @@ read(Ref, Count) ->
read_buffer_rem = BufRem - Count,
read_buffer_usage = BufUsg + Count }]};
([Handle0]) ->
+ maybe_reduce_read_cache([Ref]),
Handle = #handle{read_buffer = Buf,
read_buffer_pos = BufPos,
read_buffer_rem = BufRem,
@@ -962,6 +963,37 @@ tune_read_buffer_limit(Handle = #handle{read_buffer = Buf,
false -> Usg * 2
end, Lim)}.
+maybe_reduce_read_cache(SparedRefs) ->
+ case rabbit_memory_monitor:memory_use(bytes) of
+ {_, infinity} -> ok;
+ {MemUse, MemLimit} when MemUse < MemLimit -> ok;
+ {MemUse, MemLimit} -> reduce_read_cache(
+ (MemUse - MemLimit) * 2,
+ SparedRefs)
+ end.
+
+reduce_read_cache(MemToFree, SparedRefs) ->
+ Handles = lists:sort(
+ fun({_, H1}, {_, H2}) -> H1 < H2 end,
+ [{R, H} || {{R, fhc_handle}, H} <- get(),
+ not lists:member(R, SparedRefs)
+ andalso size(H#handle.read_buffer) > 0]),
+ FreedMem = lists:foldl(
+ fun
+ (_, Freed) when Freed >= MemToFree ->
+ Freed;
+ ({Ref, #handle{read_buffer = Buf} = Handle}, Freed) ->
+ Handle1 = reset_read_buffer(Handle),
+ put({Ref, fhc_handle}, Handle1),
+ Freed + size(Buf)
+ end, 0, Handles),
+ if
+ FreedMem < MemToFree andalso SparedRefs =/= [] ->
+ reduce_read_cache(MemToFree - FreedMem, []);
+ true ->
+ ok
+ end.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 451ee1f443..049e65ba4c 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -25,7 +25,7 @@
-behaviour(gen_server2).
-export([start_link/0, register/2, deregister/1,
- report_ram_duration/2, stop/0, conserve_resources/3]).
+ report_ram_duration/2, stop/0, conserve_resources/3, memory_use/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -92,6 +92,19 @@ conserve_resources(Pid, disk, Conserve) ->
conserve_resources(_Pid, _Source, _Conserve) ->
ok.
+memory_use(bytes) ->
+ MemoryLimit = vm_memory_monitor:get_memory_limit(),
+ {erlang:memory(total), case MemoryLimit > 0.0 of
+ true -> MemoryLimit;
+ false -> infinity
+ end};
+memory_use(ratio) ->
+ MemoryLimit = vm_memory_monitor:get_memory_limit(),
+ case MemoryLimit > 0.0 of
+ true -> erlang:memory(total) / MemoryLimit;
+ false -> infinity
+ end.
+
%%----------------------------------------------------------------------------
%% Gen_server callbacks
%%----------------------------------------------------------------------------
@@ -223,11 +236,7 @@ desired_duration_average(#state{disk_alarm = false,
queue_duration_count = Count}) ->
{ok, LimitThreshold} =
application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
- MemoryLimit = vm_memory_monitor:get_memory_limit(),
- MemoryRatio = case MemoryLimit > 0.0 of
- true -> erlang:memory(total) / MemoryLimit;
- false -> infinity
- end,
+ MemoryRatio = memory_use(ratio),
if MemoryRatio =:= infinity ->
0.0;
MemoryRatio < LimitThreshold orelse Count == 0 ->