diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_memory_manager.erl | 129 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 |
4 files changed, 43 insertions, 134 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 72325414cd..916a241062 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -549,9 +549,8 @@ i(Item, _) -> throw({bad_argument, Item}). report_memory(Hib, State = #q { mixed_state = MS }) -> - {MS1, MSize, Gain, Loss} = - rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), - rabbit_memory_manager:report_memory(self(), MSize, Gain, Loss, Hib), + {MS1, MSize} = rabbit_mixed_queue:estimate_queue_memory(MS), + rabbit_memory_manager:report_memory(self(), MSize, Hib), State #q { mixed_state = MS1 }. %--------------------------------------------------------------------------- diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl index ab5b545abd..44582dc424 100644 --- a/src/rabbit_memory_manager.erl +++ b/src/rabbit_memory_manager.erl @@ -38,11 +38,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/5, report_memory/3, report_memory/5, info/0, - conserve_memory/2]). +-export([register/5, report_memory/3, info/0, conserve_memory/2]). -define(TOTAL_TOKENS, 10000000). --define(ACTIVITY_THRESHOLD, 25). -define(SERVER, ?MODULE). @@ -52,10 +50,6 @@ ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok'). -spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok'). --spec(report_memory/5 :: (pid(), non_neg_integer(), - (non_neg_integer() | 'undefined'), - (non_neg_integer() | 'undefined'), bool()) -> - 'ok'). -spec(info/0 :: () -> [{atom(), any()}]). -spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). @@ -65,7 +59,6 @@ liberated_processes, callbacks, tokens_per_byte, - lowrate, hibernate, unoppressable, alarmed @@ -85,19 +78,13 @@ %% %% There are a finite number of tokens in the system. These are %% allocated to processes as the processes report their memory -%% usage. We keep track of processes which have hibernated, and -%% processes that are doing only a low rate of work (reported as a low -%% gain or loss in memory between memory reports). When a process -%% reports memory use which can't be satisfied by the available -%% tokens, we try and oppress processes first from the hibernated -%% group, and then from the lowrate group. The hibernated group is a -%% simple queue, and so is implicitly sorted by the order in which -%% processes were added to the queue. This means that when removing -%% from the queue, we evict the sleepiest (and most passive) pid -%% first. The lowrate group is a priority queue, where the priority is -%% the truncated log (base e) of the amount of memory allocated. Thus -%% when we remove from the queue, we first remove the queue from the -%% highest bucket. +%% usage. We keep track of processes which have hibernated. When a +%% process reports memory use which can't be satisfied by the +%% available tokens, we try and oppress processes first from the +%% hibernated group. The hibernated group is a simple queue, and so is +%% implicitly sorted by the order in which processes were added to the +%% queue. This means that when removing from the queue, we evict the +%% sleepiest (and most passive) pid first. %% %% If the reported memory use still can't be satisfied after %% oppressing everyone from those two groups (and note that we check @@ -123,13 +110,13 @@ %% processes in the way described above), it will be liberated. We do %% not keep any information about oppressed processes. %% -%% Note that the lowrate and hibernate groups can get very out of -%% date. This is fine, and somewhat unavoidable given the absence of -%% useful APIs for queues. Thus we allow them to get out of date -%% (processes will be left in there when they change groups, -%% duplicates can appear, dead processes are not pruned etc etc etc), -%% and when we go through the groups, summing up their allocated -%% tokens, we tidy up at that point. +%% Note that the hibernate group can get very out of date. This is +%% fine, and somewhat unavoidable given the absence of useful APIs for +%% queues. Thus we allow them to get out of date (processes will be +%% left in there when they change groups, duplicates can appear, dead +%% processes are not pruned etc etc etc), and when we go through the +%% groups, summing up their allocated tokens, we tidy up at that +%% point. %% %% A liberated process, which is reporting a smaller amount of RAM %% than its last report will remain liberated. A liberated process @@ -162,11 +149,7 @@ register(Pid, Unoppressable, Module, Function, Args) -> Module, Function, Args}). report_memory(Pid, Memory, Hibernating) -> - report_memory(Pid, Memory, undefined, undefined, Hibernating). - -report_memory(Pid, Memory, Gain, Loss, Hibernating) -> - gen_server2:cast(?SERVER, - {report_memory, Pid, Memory, Gain, Loss, Hibernating}). + gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Hibernating}). info() -> gen_server2:call(?SERVER, info). @@ -186,7 +169,6 @@ init([]) -> liberated_processes = dict:new(), callbacks = dict:new(), tokens_per_byte = TPB, - lowrate = priority_queue:new(), hibernate = queue:new(), unoppressable = sets:new(), alarmed = false @@ -195,34 +177,25 @@ init([]) -> handle_call(info, _From, State) -> State1 = #state { available_tokens = Avail, liberated_processes = Libre, - lowrate = Lazy, hibernate = Sleepy, unoppressable = Unoppressable } = free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying {reply, [{ available_tokens, Avail }, { liberated_processes, dict:to_list(Libre) }, - { lowrate_processes, priority_queue:to_list(Lazy)}, { hibernated_processes, queue:to_list(Sleepy) }, { unoppressable_processes, sets:to_list(Unoppressable) }], State1}. -handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, +handle_cast({report_memory, Pid, Memory, Hibernating}, State = #state { liberated_processes = Libre, available_tokens = Avail, callbacks = Callbacks, tokens_per_byte = TPB, alarmed = Alarmed }) -> Req = rabbit_misc:ceil(TPB * Memory), - LowRate = case {BytesGained, BytesLost} of - {undefined, _} -> false; - {_, undefined} -> false; - {G, L} -> G < ?ACTIVITY_THRESHOLD andalso - L < ?ACTIVITY_THRESHOLD - end, LibreActivity = if Hibernating -> hibernate; - LowRate -> lowrate; true -> active end, - {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} = + {StateN = #state { hibernate = Sleepy }, ActivityNew} = case find_process(Pid, Libre) of {libre, {OAlloc, _OActivity}} -> Avail1 = Avail + OAlloc, @@ -250,7 +223,7 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, State1 = #state { available_tokens = Avail1, liberated_processes = Libre1 } = free_upto(Pid, Req, State), - case Req > Avail1 orelse Hibernating orelse LowRate of + case Req > Avail1 orelse Hibernating of true -> %% not enough space, or no compelling %% reason, so stay oppressed @@ -270,8 +243,6 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, case ActivityNew of active -> StateN; oppressed -> StateN; - lowrate -> - StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) }; hibernate -> StateN #state { hibernate = queue:in(Pid, Sleepy) } end, @@ -316,12 +287,6 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -add_to_lowrate(Pid, Alloc, Lazy) -> - Bucket = if Alloc == 0 -> 0; %% can't take log(0) - true -> trunc(math:log(Alloc)) %% log base e - end, - priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy). - find_process(Pid, Libre) -> case dict:find(Pid, Libre) of {ok, Value} -> {libre, Value}; @@ -332,18 +297,6 @@ set_process_mode(Callbacks, Pid, Mode) -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), erlang:apply(Module, Function, Args ++ [Mode]). -tidy_and_sum_lazy(IgnorePids, Lazy, Libre) -> - tidy_and_sum(lowrate, Libre, - fun (Lazy1) -> - case priority_queue:out(Lazy1) of - {empty, Lazy2} -> - {empty, Lazy2}; - {{value, {Pid, _Bucket, _Alloc}}, Lazy2} -> - {{value, Pid}, Lazy2} - end - end, fun add_to_lowrate/3, IgnorePids, Lazy, - priority_queue:new(), 0). - tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) -> tidy_and_sum(hibernate, Libre, fun queue:out/1, fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, @@ -372,22 +325,6 @@ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) end. -free_upto_lazy(IgnorePids, Callbacks, Lazy, Libre, Req) -> - free_from( - Callbacks, - fun(_Libre, Lazy1, LazyAcc) -> - case priority_queue:out(Lazy1) of - {empty, _Lazy2} -> - empty; - {{value, V = {Pid, Bucket, Alloc}}, Lazy2} -> - case sets:is_element(Pid, IgnorePids) of - true -> {skip, Lazy2, - priority_queue:in(V, Bucket, LazyAcc)}; - false -> {value, Lazy2, Pid, Alloc} - end - end - end, fun priority_queue:join/2, Libre, Lazy, priority_queue:new(), Req). - free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) -> free_from(Callbacks, fun(Libre1, Sleepy1, SleepyAcc) -> @@ -425,34 +362,18 @@ free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) -> free_upto(Pid, Req, State = #state { available_tokens = Avail, liberated_processes = Libre, callbacks = Callbacks, - lowrate = Lazy, hibernate = Sleepy, unoppressable = Unoppressable }) when Req > Avail -> Unoppressable1 = sets:add_element(Pid, Unoppressable), {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre), case Req > Avail + SleepySum of - true -> %% not enough in sleepy, have a look in lazy too - {Lazy1, LazySum} = tidy_and_sum_lazy(Unoppressable1, Lazy, Libre), - case Req > Avail + SleepySum + LazySum of - true -> %% can't free enough, just return tidied state - State #state { lowrate = Lazy1, hibernate = Sleepy1 }; - false -> %% need to free all of sleepy, and some of lazy - {Sleepy2, Libre1, ReqRem} = - free_upto_sleepy(Unoppressable1, Callbacks, - Sleepy1, Libre, Req), - {Lazy2, Libre2, ReqRem1} = - free_upto_lazy(Unoppressable1, Callbacks, - Lazy1, Libre1, ReqRem), - %% ReqRem1 will be <= 0 because it's - %% likely we'll have freed more than we - %% need, thus Req - ReqRem1 is total freed - State #state { available_tokens = Avail + (Req - ReqRem1), - liberated_processes = Libre2, - lowrate = Lazy2, - hibernate = Sleepy2 } - end; - false -> %% enough available in sleepy, don't touch lazy + true -> %% not enough in sleepy, just return tidied state + State #state { hibernate = Sleepy1 }; + false -> + %% ReqRem1 will be <= 0 because it's likely we'll have + %% freed more than we need, thus Req - ReqRem1 is total + %% freed {Sleepy2, Libre1, ReqRem} = free_upto_sleepy(Unoppressable1, Callbacks, Sleepy1, Libre, Req), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 251c2046dc..bbec524b7f 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -40,7 +40,7 @@ len/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). -export([set_storage_mode/3, storage_mode/1, - estimate_queue_memory_and_reset_counters/1]). + estimate_queue_memory/1]). -record(mqstate, { mode, msg_buf, @@ -48,8 +48,6 @@ is_durable, length, memory_size, - memory_gain, - memory_loss, prefetcher } ). @@ -67,8 +65,6 @@ is_durable :: boolean(), length :: non_neg_integer(), memory_size :: (non_neg_integer() | 'undefined'), - memory_gain :: (non_neg_integer() | 'undefined'), - memory_loss :: (non_neg_integer() | 'undefined'), prefetcher :: (pid() | 'undefined') }). -type(acktag() :: ( 'no_on_disk' | { non_neg_integer(), non_neg_integer() })). @@ -92,9 +88,8 @@ -spec(is_empty/1 :: (mqstate()) -> boolean()). -spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). --spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) -> - {mqstate(), non_neg_integer(), non_neg_integer(), - non_neg_integer()}). +-spec(estimate_queue_memory/1 :: (mqstate()) -> + {mqstate(), non_neg_integer()}). -spec(storage_mode/1 :: (mqstate()) -> mode()). -endif. @@ -126,8 +121,7 @@ init(Queue, IsDurable) -> MsgBuf = inc_queue_length(queue:new(), Len1), {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, is_durable = IsDurable, length = Len1, - memory_size = Size, memory_gain = undefined, - memory_loss = undefined, prefetcher = undefined }}. + memory_size = Size, prefetcher = undefined }}. publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = Mode, is_durable = IsDurable, @@ -492,9 +486,8 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), {[], []}. -estimate_queue_memory_and_reset_counters(State = - #mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) -> - {State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}. +estimate_queue_memory(State = #mqstate { memory_size = Size }) -> + {State, 4 * Size}. storage_mode(#mqstate { mode = Mode }) -> Mode. @@ -515,15 +508,11 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> Msg #basic_message { content = rabbit_binary_generator:ensure_content_encoded(Content) }. -gain_memory(Inc, State = #mqstate { memory_size = QSize, - memory_gain = Gain }) -> - State #mqstate { memory_size = QSize + Inc, - memory_gain = Gain + Inc }. +gain_memory(Inc, State = #mqstate { memory_size = QSize }) -> + State #mqstate { memory_size = QSize + Inc }. -lose_memory(Dec, State = #mqstate { memory_size = QSize, - memory_loss = Loss }) -> - State #mqstate { memory_size = QSize - Dec, - memory_loss = Loss + Dec }. +lose_memory(Dec, State = #mqstate { memory_size = QSize }) -> + State #mqstate { memory_size = QSize - Dec }. inc_queue_length(MsgBuf, 0) -> MsgBuf; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 44abdda415..039e9aa487 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1081,8 +1081,8 @@ rdq_test_purge() -> rdq_new_mixed_queue(Q, Durable, Disk) -> {ok, MS} = rabbit_mixed_queue:init(Q, Durable), - {MS1, _, _, _} = - rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), + {MS1, _} = + rabbit_mixed_queue:estimate_queue_memory(MS), case Disk of true -> {ok, MS2} = rabbit_mixed_queue:set_storage_mode(disk, [], MS1), MS2; @@ -1115,15 +1115,15 @@ rdq_test_mixed_queue_modes() -> end, MS4, lists:seq(1,10)), 30 = rabbit_mixed_queue:len(MS6), io:format("Published a mixture of messages; ~w~n", - [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]), + [rabbit_mixed_queue:estimate_queue_memory(MS6)]), {ok, MS7} = rabbit_mixed_queue:set_storage_mode(disk, [], MS6), 30 = rabbit_mixed_queue:len(MS7), io:format("Converted to disk only mode; ~w~n", - [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]), + [rabbit_mixed_queue:estimate_queue_memory(MS7)]), {ok, MS8} = rabbit_mixed_queue:set_storage_mode(mixed, [], MS7), 30 = rabbit_mixed_queue:len(MS8), io:format("Converted to mixed mode; ~w~n", - [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]), + [rabbit_mixed_queue:estimate_queue_memory(MS8)]), MS10 = lists:foldl( fun (N, MS9) -> @@ -1162,7 +1162,7 @@ rdq_test_mixed_queue_modes() -> rdq_start(), MS17 = rdq_new_mixed_queue(q, true, false), 0 = rabbit_mixed_queue:len(MS17), - {MS17,0,0,0} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS17), + {MS17,0} = rabbit_mixed_queue:estimate_queue_memory(MS17), io:format("Recovered queue~n"), rdq_stop(), passed. |
