summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_memory_manager.erl129
-rw-r--r--src/rabbit_mixed_queue.erl31
-rw-r--r--src/rabbit_tests.erl12
4 files changed, 43 insertions, 134 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 72325414cd..916a241062 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -549,9 +549,8 @@ i(Item, _) ->
throw({bad_argument, Item}).
report_memory(Hib, State = #q { mixed_state = MS }) ->
- {MS1, MSize, Gain, Loss} =
- rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
- rabbit_memory_manager:report_memory(self(), MSize, Gain, Loss, Hib),
+ {MS1, MSize} = rabbit_mixed_queue:estimate_queue_memory(MS),
+ rabbit_memory_manager:report_memory(self(), MSize, Hib),
State #q { mixed_state = MS1 }.
%---------------------------------------------------------------------------
diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl
index ab5b545abd..44582dc424 100644
--- a/src/rabbit_memory_manager.erl
+++ b/src/rabbit_memory_manager.erl
@@ -38,11 +38,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/5, report_memory/3, report_memory/5, info/0,
- conserve_memory/2]).
+-export([register/5, report_memory/3, info/0, conserve_memory/2]).
-define(TOTAL_TOKENS, 10000000).
--define(ACTIVITY_THRESHOLD, 25).
-define(SERVER, ?MODULE).
@@ -52,10 +50,6 @@
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok').
-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
--spec(report_memory/5 :: (pid(), non_neg_integer(),
- (non_neg_integer() | 'undefined'),
- (non_neg_integer() | 'undefined'), bool()) ->
- 'ok').
-spec(info/0 :: () -> [{atom(), any()}]).
-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
@@ -65,7 +59,6 @@
liberated_processes,
callbacks,
tokens_per_byte,
- lowrate,
hibernate,
unoppressable,
alarmed
@@ -85,19 +78,13 @@
%%
%% There are a finite number of tokens in the system. These are
%% allocated to processes as the processes report their memory
-%% usage. We keep track of processes which have hibernated, and
-%% processes that are doing only a low rate of work (reported as a low
-%% gain or loss in memory between memory reports). When a process
-%% reports memory use which can't be satisfied by the available
-%% tokens, we try and oppress processes first from the hibernated
-%% group, and then from the lowrate group. The hibernated group is a
-%% simple queue, and so is implicitly sorted by the order in which
-%% processes were added to the queue. This means that when removing
-%% from the queue, we evict the sleepiest (and most passive) pid
-%% first. The lowrate group is a priority queue, where the priority is
-%% the truncated log (base e) of the amount of memory allocated. Thus
-%% when we remove from the queue, we first remove the queue from the
-%% highest bucket.
+%% usage. We keep track of processes which have hibernated. When a
+%% process reports memory use which can't be satisfied by the
+%% available tokens, we try and oppress processes first from the
+%% hibernated group. The hibernated group is a simple queue, and so is
+%% implicitly sorted by the order in which processes were added to the
+%% queue. This means that when removing from the queue, we evict the
+%% sleepiest (and most passive) pid first.
%%
%% If the reported memory use still can't be satisfied after
%% oppressing everyone from those two groups (and note that we check
@@ -123,13 +110,13 @@
%% processes in the way described above), it will be liberated. We do
%% not keep any information about oppressed processes.
%%
-%% Note that the lowrate and hibernate groups can get very out of
-%% date. This is fine, and somewhat unavoidable given the absence of
-%% useful APIs for queues. Thus we allow them to get out of date
-%% (processes will be left in there when they change groups,
-%% duplicates can appear, dead processes are not pruned etc etc etc),
-%% and when we go through the groups, summing up their allocated
-%% tokens, we tidy up at that point.
+%% Note that the hibernate group can get very out of date. This is
+%% fine, and somewhat unavoidable given the absence of useful APIs for
+%% queues. Thus we allow them to get out of date (processes will be
+%% left in there when they change groups, duplicates can appear, dead
+%% processes are not pruned etc etc etc), and when we go through the
+%% groups, summing up their allocated tokens, we tidy up at that
+%% point.
%%
%% A liberated process, which is reporting a smaller amount of RAM
%% than its last report will remain liberated. A liberated process
@@ -162,11 +149,7 @@ register(Pid, Unoppressable, Module, Function, Args) ->
Module, Function, Args}).
report_memory(Pid, Memory, Hibernating) ->
- report_memory(Pid, Memory, undefined, undefined, Hibernating).
-
-report_memory(Pid, Memory, Gain, Loss, Hibernating) ->
- gen_server2:cast(?SERVER,
- {report_memory, Pid, Memory, Gain, Loss, Hibernating}).
+ gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Hibernating}).
info() ->
gen_server2:call(?SERVER, info).
@@ -186,7 +169,6 @@ init([]) ->
liberated_processes = dict:new(),
callbacks = dict:new(),
tokens_per_byte = TPB,
- lowrate = priority_queue:new(),
hibernate = queue:new(),
unoppressable = sets:new(),
alarmed = false
@@ -195,34 +177,25 @@ init([]) ->
handle_call(info, _From, State) ->
State1 = #state { available_tokens = Avail,
liberated_processes = Libre,
- lowrate = Lazy,
hibernate = Sleepy,
unoppressable = Unoppressable } =
free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
{reply, [{ available_tokens, Avail },
{ liberated_processes, dict:to_list(Libre) },
- { lowrate_processes, priority_queue:to_list(Lazy)},
{ hibernated_processes, queue:to_list(Sleepy) },
{ unoppressable_processes, sets:to_list(Unoppressable) }], State1}.
-handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
+handle_cast({report_memory, Pid, Memory, Hibernating},
State = #state { liberated_processes = Libre,
available_tokens = Avail,
callbacks = Callbacks,
tokens_per_byte = TPB,
alarmed = Alarmed }) ->
Req = rabbit_misc:ceil(TPB * Memory),
- LowRate = case {BytesGained, BytesLost} of
- {undefined, _} -> false;
- {_, undefined} -> false;
- {G, L} -> G < ?ACTIVITY_THRESHOLD andalso
- L < ?ACTIVITY_THRESHOLD
- end,
LibreActivity = if Hibernating -> hibernate;
- LowRate -> lowrate;
true -> active
end,
- {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} =
+ {StateN = #state { hibernate = Sleepy }, ActivityNew} =
case find_process(Pid, Libre) of
{libre, {OAlloc, _OActivity}} ->
Avail1 = Avail + OAlloc,
@@ -250,7 +223,7 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
State1 = #state { available_tokens = Avail1,
liberated_processes = Libre1 } =
free_upto(Pid, Req, State),
- case Req > Avail1 orelse Hibernating orelse LowRate of
+ case Req > Avail1 orelse Hibernating of
true ->
%% not enough space, or no compelling
%% reason, so stay oppressed
@@ -270,8 +243,6 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
case ActivityNew of
active -> StateN;
oppressed -> StateN;
- lowrate ->
- StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) };
hibernate ->
StateN #state { hibernate = queue:in(Pid, Sleepy) }
end,
@@ -316,12 +287,6 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-add_to_lowrate(Pid, Alloc, Lazy) ->
- Bucket = if Alloc == 0 -> 0; %% can't take log(0)
- true -> trunc(math:log(Alloc)) %% log base e
- end,
- priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy).
-
find_process(Pid, Libre) ->
case dict:find(Pid, Libre) of
{ok, Value} -> {libre, Value};
@@ -332,18 +297,6 @@ set_process_mode(Callbacks, Pid, Mode) ->
{Module, Function, Args} = dict:fetch(Pid, Callbacks),
erlang:apply(Module, Function, Args ++ [Mode]).
-tidy_and_sum_lazy(IgnorePids, Lazy, Libre) ->
- tidy_and_sum(lowrate, Libre,
- fun (Lazy1) ->
- case priority_queue:out(Lazy1) of
- {empty, Lazy2} ->
- {empty, Lazy2};
- {{value, {Pid, _Bucket, _Alloc}}, Lazy2} ->
- {{value, Pid}, Lazy2}
- end
- end, fun add_to_lowrate/3, IgnorePids, Lazy,
- priority_queue:new(), 0).
-
tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) ->
tidy_and_sum(hibernate, Libre, fun queue:out/1,
fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end,
@@ -372,22 +325,6 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet,
DupCheckSet1, CataInit1, AnaInit1, AllocAcc1)
end.
-free_upto_lazy(IgnorePids, Callbacks, Lazy, Libre, Req) ->
- free_from(
- Callbacks,
- fun(_Libre, Lazy1, LazyAcc) ->
- case priority_queue:out(Lazy1) of
- {empty, _Lazy2} ->
- empty;
- {{value, V = {Pid, Bucket, Alloc}}, Lazy2} ->
- case sets:is_element(Pid, IgnorePids) of
- true -> {skip, Lazy2,
- priority_queue:in(V, Bucket, LazyAcc)};
- false -> {value, Lazy2, Pid, Alloc}
- end
- end
- end, fun priority_queue:join/2, Libre, Lazy, priority_queue:new(), Req).
-
free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) ->
free_from(Callbacks,
fun(Libre1, Sleepy1, SleepyAcc) ->
@@ -425,34 +362,18 @@ free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) ->
free_upto(Pid, Req, State = #state { available_tokens = Avail,
liberated_processes = Libre,
callbacks = Callbacks,
- lowrate = Lazy,
hibernate = Sleepy,
unoppressable = Unoppressable })
when Req > Avail ->
Unoppressable1 = sets:add_element(Pid, Unoppressable),
{Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre),
case Req > Avail + SleepySum of
- true -> %% not enough in sleepy, have a look in lazy too
- {Lazy1, LazySum} = tidy_and_sum_lazy(Unoppressable1, Lazy, Libre),
- case Req > Avail + SleepySum + LazySum of
- true -> %% can't free enough, just return tidied state
- State #state { lowrate = Lazy1, hibernate = Sleepy1 };
- false -> %% need to free all of sleepy, and some of lazy
- {Sleepy2, Libre1, ReqRem} =
- free_upto_sleepy(Unoppressable1, Callbacks,
- Sleepy1, Libre, Req),
- {Lazy2, Libre2, ReqRem1} =
- free_upto_lazy(Unoppressable1, Callbacks,
- Lazy1, Libre1, ReqRem),
- %% ReqRem1 will be <= 0 because it's
- %% likely we'll have freed more than we
- %% need, thus Req - ReqRem1 is total freed
- State #state { available_tokens = Avail + (Req - ReqRem1),
- liberated_processes = Libre2,
- lowrate = Lazy2,
- hibernate = Sleepy2 }
- end;
- false -> %% enough available in sleepy, don't touch lazy
+ true -> %% not enough in sleepy, just return tidied state
+ State #state { hibernate = Sleepy1 };
+ false ->
+ %% ReqRem1 will be <= 0 because it's likely we'll have
+ %% freed more than we need, thus Req - ReqRem1 is total
+ %% freed
{Sleepy2, Libre1, ReqRem} =
free_upto_sleepy(Unoppressable1, Callbacks,
Sleepy1, Libre, Req),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 251c2046dc..bbec524b7f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -40,7 +40,7 @@
len/1, is_empty/1, delete_queue/1, maybe_prefetch/1]).
-export([set_storage_mode/3, storage_mode/1,
- estimate_queue_memory_and_reset_counters/1]).
+ estimate_queue_memory/1]).
-record(mqstate, { mode,
msg_buf,
@@ -48,8 +48,6 @@
is_durable,
length,
memory_size,
- memory_gain,
- memory_loss,
prefetcher
}
).
@@ -67,8 +65,6 @@
is_durable :: boolean(),
length :: non_neg_integer(),
memory_size :: (non_neg_integer() | 'undefined'),
- memory_gain :: (non_neg_integer() | 'undefined'),
- memory_loss :: (non_neg_integer() | 'undefined'),
prefetcher :: (pid() | 'undefined')
}).
-type(acktag() :: ( 'no_on_disk' | { non_neg_integer(), non_neg_integer() })).
@@ -92,9 +88,8 @@
-spec(is_empty/1 :: (mqstate()) -> boolean()).
-spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
--spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) ->
- {mqstate(), non_neg_integer(), non_neg_integer(),
- non_neg_integer()}).
+-spec(estimate_queue_memory/1 :: (mqstate()) ->
+ {mqstate(), non_neg_integer()}).
-spec(storage_mode/1 :: (mqstate()) -> mode()).
-endif.
@@ -126,8 +121,7 @@ init(Queue, IsDurable) ->
MsgBuf = inc_queue_length(queue:new(), Len1),
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
is_durable = IsDurable, length = Len1,
- memory_size = Size, memory_gain = undefined,
- memory_loss = undefined, prefetcher = undefined }}.
+ memory_size = Size, prefetcher = undefined }}.
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = Mode, is_durable = IsDurable,
@@ -492,9 +486,8 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
{[], []}.
-estimate_queue_memory_and_reset_counters(State =
- #mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) ->
- {State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}.
+estimate_queue_memory(State = #mqstate { memory_size = Size }) ->
+ {State, 4 * Size}.
storage_mode(#mqstate { mode = Mode }) ->
Mode.
@@ -515,15 +508,11 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
Msg #basic_message {
content = rabbit_binary_generator:ensure_content_encoded(Content) }.
-gain_memory(Inc, State = #mqstate { memory_size = QSize,
- memory_gain = Gain }) ->
- State #mqstate { memory_size = QSize + Inc,
- memory_gain = Gain + Inc }.
+gain_memory(Inc, State = #mqstate { memory_size = QSize }) ->
+ State #mqstate { memory_size = QSize + Inc }.
-lose_memory(Dec, State = #mqstate { memory_size = QSize,
- memory_loss = Loss }) ->
- State #mqstate { memory_size = QSize - Dec,
- memory_loss = Loss + Dec }.
+lose_memory(Dec, State = #mqstate { memory_size = QSize }) ->
+ State #mqstate { memory_size = QSize - Dec }.
inc_queue_length(MsgBuf, 0) ->
MsgBuf;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 44abdda415..039e9aa487 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1081,8 +1081,8 @@ rdq_test_purge() ->
rdq_new_mixed_queue(Q, Durable, Disk) ->
{ok, MS} = rabbit_mixed_queue:init(Q, Durable),
- {MS1, _, _, _} =
- rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
+ {MS1, _} =
+ rabbit_mixed_queue:estimate_queue_memory(MS),
case Disk of
true -> {ok, MS2} = rabbit_mixed_queue:set_storage_mode(disk, [], MS1),
MS2;
@@ -1115,15 +1115,15 @@ rdq_test_mixed_queue_modes() ->
end, MS4, lists:seq(1,10)),
30 = rabbit_mixed_queue:len(MS6),
io:format("Published a mixture of messages; ~w~n",
- [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]),
+ [rabbit_mixed_queue:estimate_queue_memory(MS6)]),
{ok, MS7} = rabbit_mixed_queue:set_storage_mode(disk, [], MS6),
30 = rabbit_mixed_queue:len(MS7),
io:format("Converted to disk only mode; ~w~n",
- [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]),
+ [rabbit_mixed_queue:estimate_queue_memory(MS7)]),
{ok, MS8} = rabbit_mixed_queue:set_storage_mode(mixed, [], MS7),
30 = rabbit_mixed_queue:len(MS8),
io:format("Converted to mixed mode; ~w~n",
- [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]),
+ [rabbit_mixed_queue:estimate_queue_memory(MS8)]),
MS10 =
lists:foldl(
fun (N, MS9) ->
@@ -1162,7 +1162,7 @@ rdq_test_mixed_queue_modes() ->
rdq_start(),
MS17 = rdq_new_mixed_queue(q, true, false),
0 = rabbit_mixed_queue:len(MS17),
- {MS17,0,0,0} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS17),
+ {MS17,0} = rabbit_mixed_queue:estimate_queue_memory(MS17),
io:format("Recovered queue~n"),
rdq_stop(),
passed.