diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-03 17:43:52 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-03 17:43:52 +0100 |
| commit | e5962c4232be43111fb7ca4a60fa0360f49ee9f5 (patch) | |
| tree | 7d2cc83a03ead10d150f231992fe48a33f91c692 /src | |
| parent | 9fc504937632389a8570606f3b9518f13e6a8e24 (diff) | |
| download | rabbitmq-server-git-e5962c4232be43111fb7ca4a60fa0360f49ee9f5.tar.gz | |
Reworked. Because the disk->mixed transition doesn't eat up any ram, there is no need for the emergency tokens, nor any need for the weird doubling. So it's basically got much simpler.
We hold two queues, one of hibernating queues (ordered by when they hibernated) and another priority_queue of lowrate queues (ordered by the amount of memory allocated to them). We evict to disk from the hibernated and then the lowrate queues in their relevant orders. Seems to work. Oh and disk_queue is now managed by the tokens too.
Diffstat (limited to 'src')
| -rw-r--r-- | src/priority_queue.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 104 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 406 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 58 |
8 files changed, 435 insertions, 212 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c41c..9683809933 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, pout/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,6 +74,8 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(pout/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +150,59 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +pout({queue, [], []}) -> + {empty, {queue, [], []}}; +pout({queue, _, _} = Q) -> + {{value, V}, Q1} = out(Q), + {{value, V, 0}, Q1}; +pout({pqueue, [{P, Q} | Queues]}) -> + {{value, V}, Q1} = out(Q), + NewQ = case is_empty(Q1) of + true -> case Queues of + [] -> {queue, [], []}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues} + end; + false -> {pqueue, [{P, Q1} | Queues]} + end, + {{value, V, -P}, NewQ}. + +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 986546dce7..6b19695157 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,7 +102,8 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - {ok, Mode} = rabbit_queue_mode_manager:register(self()), + {ok, Mode} = rabbit_queue_mode_manager:register + (self(), rabbit_amqqueue, set_mode, [self()]), {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode), {ok, #q{q = Q, owner = none, @@ -141,7 +142,7 @@ reply(Reply, NewState) -> reply1(Reply, NewState = #q { hibernated_at = undefined }) -> {reply, Reply, NewState, NewState #q.hibernate_after}; reply1(Reply, NewState) -> - NewState1 = report_memory(false, adjust_hibernate_after(NewState)), + NewState1 = adjust_hibernate_after(NewState), {reply, Reply, NewState1, NewState1 #q.hibernate_after}. noreply(NewState = #q { memory_report_timer = undefined }) -> @@ -152,7 +153,7 @@ noreply(NewState) -> noreply1(NewState = #q { hibernated_at = undefined }) -> {noreply, NewState, NewState #q.hibernate_after}; noreply1(NewState) -> - NewState1 = report_memory(false, adjust_hibernate_after(NewState)), + NewState1 = adjust_hibernate_after(NewState), {noreply, NewState1, NewState1 #q.hibernate_after}. adjust_hibernate_after(State = #q { hibernated_at = undefined }) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index dc328792a0..6674ce0e4d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -46,23 +46,24 @@ -export([length/1, filesync/0, cache_info/0]). --export([stop/0, stop_and_obliterate/0, conserve_memory/2, - to_disk_only_mode/0, to_ram_disk_mode/0]). +-export([stop/0, stop_and_obliterate/0, report_memory/0, + set_mode/1, to_disk_only_mode/0, to_ram_disk_mode/0]). -include("rabbit.hrl"). --define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK, 255). --define(INTEGER_SIZE_BYTES, 8). --define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). --define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). --define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). --define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). --define(CACHE_ETS_NAME, rabbit_disk_queue_cache). --define(FILE_EXTENSION, ".rdq"). --define(FILE_EXTENSION_TMP, ".rdt"). --define(FILE_EXTENSION_DETS, ".dets"). --define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK, 255). +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). +-define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). +-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds -define(SERVER, ?MODULE). @@ -89,7 +90,9 @@ on_sync_froms, %% list of commiters to run on sync (reversed) timer_ref, %% TRef for our interval timer last_sync_offset, %% current_offset at the last time we sync'd - message_cache %% ets message cache + message_cache, %% ets message cache + memory_report_timer, %% TRef for the memory report timer + wordsize %% bytes in a word on this platform }). %% The components: @@ -267,7 +270,8 @@ -spec(length/1 :: (queue_name()) -> non_neg_integer()). -spec(filesync/0 :: () -> 'ok'). -spec(cache_info/0 :: () -> [{atom(), term()}]). --spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). +-spec(report_memory/0 :: () -> 'ok'). +-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok'). -endif. @@ -339,8 +343,11 @@ filesync() -> cache_info() -> gen_server2:call(?SERVER, cache_info, infinity). -conserve_memory(_Pid, Conserve) -> - gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}). +report_memory() -> + gen_server2:cast(?SERVER, report_memory). + +set_mode(Mode) -> + gen_server2:cast(?SERVER, {set_mode, Mode}). %% ---- GEN-SERVER INTERNAL API ---- @@ -354,7 +361,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% brutal_kill. %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), - ok = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + {ok, Mode} = rabbit_queue_mode_manager:register + (self(), rabbit_disk_queue, set_mode, []), Node = node(), ok = case mnesia:change_table_copy_type(rabbit_disk_queue, Node, @@ -381,6 +389,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL, + rabbit_disk_queue, report_memory, []), + + InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location_dets = MsgLocationDets, @@ -402,7 +414,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> timer_ref = undefined, last_sync_offset = 0, message_cache = ets:new(?CACHE_ETS_NAME, - [set, private]) + [set, private]), + memory_report_timer = TRef, + wordsize = erlang:system_info(wordsize) }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -419,7 +433,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> false -> %% new file, so preallocate ok = preallocate(FileHdl, FileSizeLimit, Offset) end, - {ok, State1 #dqstate { current_file_handle = FileHdl }}. + State2 = State1 #dqstate { current_file_handle = FileHdl }, + {ok, case Mode of + mixed -> State2; + disk -> to_disk_only_mode(State2) + end}. handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, false, State), @@ -493,11 +511,15 @@ handle_cast({delete_queue, Q}, State) -> noreply(State1); handle_cast(filesync, State) -> noreply(sync_current_file_handle(State)); -handle_cast({conserve_memory, Conserve}, State) -> - noreply((case Conserve of - true -> fun to_disk_only_mode/1; - false -> fun to_ram_disk_mode/1 - end)(State)). +handle_cast({set_mode, Mode}, State) -> + noreply((case Mode of + disk -> fun to_disk_only_mode/1; + mixed -> fun to_ram_disk_mode/1 + end)(State)); +handle_cast(report_memory, State) -> + Bytes = memory_use(State), + rabbit_queue_mode_manager:report_memory(self(), Bytes), + noreply(State). handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -513,10 +535,12 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge} + read_file_handles = {ReadHdls, _ReadHdlsAge}, + memory_report_timer = TRef }) -> - State1 = stop_commit_timer(State), %% deliberately ignoring return codes here + timer:cancel(TRef), + State1 = stop_commit_timer(State), dets:close(MsgLocationDets), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), @@ -531,7 +555,8 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, end, ok, ReadHdls), State1 #dqstate { current_file_handle = undefined, current_dirty = false, - read_file_handles = {dict:new(), gb_trees:empty()} + read_file_handles = {dict:new(), gb_trees:empty()}, + memory_report_timer = undefined }. code_change(_OldVsn, State, _Extra) -> @@ -539,6 +564,27 @@ code_change(_OldVsn, State, _Extra) -> %% ---- UTILITY FUNCTIONS ---- +memory_use(#dqstate { operation_mode = ram_disk, + file_summary = FileSummary, + sequences = Sequences, + msg_location_ets = MsgLocationEts, + wordsize = WordSize + }) -> + WordSize * (mnesia:table_info(rabbit_disk_queue, memory) + + ets:info(MsgLocationEts, memory) + + ets:info(FileSummary, memory) + + ets:info(Sequences, memory)); +memory_use(#dqstate { operation_mode = disk_only, + file_summary = FileSummary, + sequences = Sequences, + msg_location_dets = MsgLocationDets, + wordsize = WordSize + }) -> + (WordSize * (ets:info(FileSummary, memory) + + ets:info(Sequences, memory))) + + mnesia:table_info(rabbit_disk_queue, memory) + + dets:info(MsgLocationDets, memory). + to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) -> State; to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2971e33265..e66eb6b088 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -52,7 +52,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -115,7 +115,8 @@ -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). - +-spec(ceil/1 :: (number()) -> number()). + -endif. %%---------------------------------------------------------------------------- @@ -442,3 +443,8 @@ unfold(Fun, Acc, Init) -> {true, E, I} -> unfold(Fun, [E|Acc], I); false -> {Acc, Init} end. + +ceil(N) when N - trunc(N) > 0 -> + 1 + trunc(N); +ceil(N) -> + N. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 26fa029db7..d9c4689851 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -541,7 +541,7 @@ is_empty(#mqstate { length = Length }) -> estimate_queue_memory(#mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) -> - {2*Size, Gain, Loss}. + {Size, Gain, Loss}. reset_counters(State) -> State #mqstate { memory_gain = 0, memory_loss = 0 }. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 149501f8b3..b40294f686 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -150,7 +150,7 @@ table_definitions() -> {type, set}, {local_content, true}, {attributes, record_info(fields, dq_msg_loc)}, - {disc_only_copies, [node()]}]} + {disc_copies, [node()]}]} ]. replicated_table_definitions() -> diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 395249782b..30695404c3 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -38,10 +38,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/1, report_memory/5]). +-export([register/4, report_memory/2, report_memory/5, info/0]). -define(TOTAL_TOKENS, 1000). --define(LOW_WATER_MARK_FRACTION, 0.25). -define(ACTIVITY_THRESHOLD, 25). -define(INITIAL_TOKEN_ALLOCATION, 10). @@ -53,7 +52,7 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(register/1 :: (pid()) -> {'ok', queue_mode()}). +-spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}). -spec(report_memory/5 :: (pid(), non_neg_integer(), non_neg_integer(), non_neg_integer(), bool()) -> 'ok'). @@ -61,153 +60,153 @@ -endif. -record(state, { available_tokens, - available_etokens, mixed_queues, + callbacks, tokens_per_byte, - low_rate, - hibernated + lowrate, + hibernate }). start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -register(Pid) -> - gen_server2:call(?SERVER, {register, Pid}). +register(Pid, Module, Function, Args) -> + gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}). + +report_memory(Pid, Memory) -> + report_memory(Pid, Memory, undefined, undefined, false). report_memory(Pid, Memory, Gain, Loss, Hibernating) -> gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Gain, Loss, Hibernating}). +info() -> + gen_server2:call(?SERVER, info). + init([]) -> process_flag(trap_exit, true), %% todo, fix up this call as os_mon may not be running {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(), - MemAvail = MemTotal - MemUsed, - Avail = ceil(?TOTAL_TOKENS * (1 - ?LOW_WATER_MARK_FRACTION)), - EAvail = ?TOTAL_TOKENS - Avail, - {ok, #state { available_tokens = Avail, - available_etokens = EAvail, + MemAvail = (MemTotal - MemUsed) / 3, %% magic + {ok, #state { available_tokens = ?TOTAL_TOKENS, mixed_queues = dict:new(), + callbacks = dict:new(), tokens_per_byte = ?TOTAL_TOKENS / MemAvail, - low_rate = sets:new(), - hibernated = sets:new() + lowrate = priority_queue:new(), + hibernate = queue:new() }}. -handle_call({register, Pid}, _From, - State = #state { available_tokens = Avail, - mixed_queues = Mixed }) -> +handle_call({register, Pid, Module, Function, Args}, _From, + State = #state { callbacks = Callbacks }) -> _MRef = erlang:monitor(process, Pid), - {Result, State1} = + State1 = State #state { callbacks = dict:store + (Pid, {Module, Function, Args}, Callbacks) }, + State2 = #state { available_tokens = Avail, + mixed_queues = Mixed } = + free_upto(Pid, ?INITIAL_TOKEN_ALLOCATION, State1), + {Result, State3} = case ?INITIAL_TOKEN_ALLOCATION > Avail of true -> - {disk, State}; + {disk, State2}; false -> - {mixed, State #state { mixed_queues = dict:store - (Pid, {?INITIAL_TOKEN_ALLOCATION, 0}, Mixed) }} + {mixed, State2 #state { + available_tokens = + Avail - ?INITIAL_TOKEN_ALLOCATION, + mixed_queues = dict:store + (Pid, {?INITIAL_TOKEN_ALLOCATION, active}, Mixed) }} end, - {reply, {ok, Result}, State1}. + {reply, {ok, Result}, State3}; + +handle_call(info, _From, State) -> + State1 = #state { available_tokens = Avail, + mixed_queues = Mixed, + lowrate = Lazy, + hibernate = Sleepy } = + free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying + {reply, [{ available_tokens, Avail }, + { mixed_queues, dict:to_list(Mixed) }, + { lowrate_queues, priority_queue:to_list(Lazy) }, + { hibernated_queues, queue:to_list(Sleepy) }], State1}. + -handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, - State = #state { available_tokens = Avail, - available_etokens = EAvail, - tokens_per_byte = TPB, - mixed_queues = Mixed - }) -> - Req = ceil(Memory * TPB), - io:format("~w : ~w ~w ~n", [Pid, Memory, Req]), - LowRate = (BytesGained < ?ACTIVITY_THRESHOLD) - andalso (BytesLost < ?ACTIVITY_THRESHOLD), - io:format("~w ~w~n", [O, LowRate]), - State1 = - case find_queue(Pid, State) of - disk -> - case Req > Avail orelse (2*Req) > (Avail + EAvail) orelse - LowRate of - true -> State; %% remain as disk queue - false -> - %% go to mixed, allocate double Req, and use Extra - rabbit_amqqueue:set_mode(Pid, mixed), - Alloc = lists:min([2*Req, Avail]), - EAlloc = (2*Req) - Alloc, - State #state { available_tokens = Avail - Alloc, - available_etokens = EAvail - EAlloc, - mixed_queues = dict:store - (Pid, {Alloc, EAlloc}, Mixed) - } - end; - {mixed, {OAlloc, OEAlloc}} -> - io:format("~w ; ~w ~w ~n", [Pid, OAlloc, OEAlloc]), +handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, + State = #state { mixed_queues = Mixed, + available_tokens = Avail, + callbacks = Callbacks, + tokens_per_byte = TPB }) -> + Req = rabbit_misc:ceil(TPB * Memory), + LowRate = case {BytesGained, BytesLost} of + {undefined, _} -> false; + {_, undefined} -> false; + {G, L} -> G < ?ACTIVITY_THRESHOLD andalso + L < ?ACTIVITY_THRESHOLD + end, + {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} = + case find_queue(Pid, Mixed) of + {mixed, {OAlloc, _OActivity}} -> Avail1 = Avail + OAlloc, - EAvail1 = EAvail + OEAlloc, - case Req > (OAlloc + OEAlloc) of - true -> %% getting bigger - case Req > Avail1 of - true -> %% go to disk - attempt_free_from_idle(Req, Pid, - State #state { available_tokens = Avail1, - available_etokens = EAvail1, - mixed_queues = - dict:erase(Pid, Mixed) }); - false -> %% request not too big, stay mixed - State #state { available_tokens = Avail1 - Req, - available_etokens = EAvail1, - mixed_queues = dict:store - (Pid, {Req, 0}, Mixed) } - end; - false -> %% getting smaller (or staying same) - case 0 =:= OEAlloc of - true -> - case Req > Avail1 orelse LowRate of - true -> %% go to disk - attempt_free_from_idle(Req, Pid, - State #state { available_tokens = Avail1, - available_etokens = EAvail1, - mixed_queues = - dict:erase(Pid, Mixed) }); - false -> %% request not too big, stay mixed - State #state { available_tokens = Avail1 - Req, - available_etokens = EAvail1, - mixed_queues = dict:store - (Pid, {Req, 0}, Mixed) } - end; - false -> - case Req > Avail1 of - true -> - EReq = Req - Avail1, - case EReq > EAvail1 of - true -> %% go to disk - attempt_free_from_idle(Req, Pid, - State #state { available_tokens = Avail1, - available_etokens = EAvail1, - mixed_queues = - dict:erase(Pid, Mixed) }); - false -> %% request not too big, stay mixed - State #state { available_tokens = 0, - available_etokens = EAvail1 - EReq, - mixed_queues = dict:store - (Pid, {Avail1, EReq}, Mixed) } - end; - false -> %% request not too big, stay mixed - State #state { available_tokens = Avail1 - Req, - available_etokens = EAvail1, - mixed_queues = dict:store - (Pid, {Req, 0}, Mixed) } - end - end + State1 = #state { available_tokens = Avail2, + mixed_queues = Mixed1 } = + free_upto(Pid, Req, + State #state { available_tokens = Avail1 }), + case Req > Avail2 of + true -> %% nowt we can do, send to disk + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, Args ++ [disk]), + {State1 #state { mixed_queues = + dict:erase(Pid, Mixed1) }, + disk}; + false -> %% keep mixed + Activity = if Hibernating -> hibernate; + LowRate -> lowrate; + true -> active + end, + {State1 #state + { mixed_queues = + dict:store(Pid, {Req, Activity}, Mixed1), + available_tokens = Avail2 - Req }, + Activity} + end; + disk -> + State1 = #state { available_tokens = Avail1, + mixed_queues = Mixed1 } = + free_upto(Pid, Req, State), + case Req > Avail1 of + true -> %% not enough space, stay as disk + {State1, disk}; + false -> %% can go to mixed mode + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, Args ++ [mixed]), + Activity = if Hibernating -> hibernate; + LowRate -> lowrate; + true -> active + end, + {State1 #state { + mixed_queues = + dict:store(Pid, {Req, Activity}, Mixed1), + available_tokens = Avail1 - Req }, + disk} end end, - {noreply, State1}. + StateN1 = + case ActivityNew of + active -> StateN; + disk -> StateN; + lowrate -> StateN #state { lowrate = + priority_queue:in(Pid, Req, Lazy) }; + hibernate -> StateN #state { hibernate = + queue:in(Pid, Sleepy) } + end, + {noreply, StateN1}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, - available_etokens = EAvail, mixed_queues = Mixed }) -> - State1 = case find_queue(Pid, State) of + State1 = case find_queue(Pid, Mixed) of disk -> State; - {mixed, {Alloc, EAlloc}} -> + {mixed, {Alloc, _Activity}} -> State #state { available_tokens = Avail + Alloc, - available_etokens = EAvail + EAlloc, mixed_queues = dict:erase(Pid, Mixed) } end, {noreply, State1}; @@ -222,69 +221,140 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -find_queue(Pid, #state { mixed_queues = Mixed }) -> +find_queue(Pid, Mixed) -> case dict:find(Pid, Mixed) of {ok, Value} -> {mixed, Value}; error -> disk end. -ceil(N) when N - trunc(N) > 0 -> - 1 + trunc(N); -ceil(N) -> - N. +tidy_and_sum_lazy(IgnorePid, Lazy, Mixed) -> + tidy_and_sum_lazy(sets:add_element(IgnorePid, sets:new()), + Lazy, Mixed, 0, priority_queue:new()). -attempt_free_from_idle(Req, Pid, State = #state { available_tokens = Avail, - available_etokens = EAvail, - low_rate = Lazy, - hibernated = Sleepy, - mixed_queues = Mixed }) -> - case Req > Avail of - true -> - {Sleepy1, Freed, EFreed, State1} = free_upto(Req, sets:to_list(Sleepy), State), - case Req > Avail + Freed of +tidy_and_sum_lazy(DupCheckSet, Lazy, Mixed, FreeAcc, LazyAcc) -> + case priority_queue:pout(Lazy) of + {empty, Lazy} -> {FreeAcc, LazyAcc}; + {{value, Pid, Alloc}, Lazy1} -> + case sets:is_element(Pid, DupCheckSet) of true -> - {Lazy1, Freed1, EFreed1, State2} = free_upto(Req, sets:to_list(Lazy), State1), - case Req > Avail + Freed + Freed1 of - true -> - rabbit_amqqueue:set_mode(Pid, disk), - State2 #state { available_tokens = Avail + Freed + Freed1, - available_etokens = EAvail + EFreed + EFreed1, - low_rate = Lazy1, - hibernated = Sleepy1, - mixed_queues = dict:erase(Pid, Mixed) - }; - false -> - State2 #state { available_tokens = Avail + Freed + Freed1 - Req, - available_etokens = EAvail + EFreed + EFreed1, - low_rate = Lazy1, - hibernated = Sleepy1, - mixed_queues = dict:store(Pid, {Req, 0}, Mixed) - } - end; + tidy_and_sum_lazy(DupCheckSet, Lazy1, Mixed, FreeAcc, + LazyAcc); false -> - State1 #state { available_tokens = Avail + Freed - Req, - available_etokens = EAvail + EFreed, - hibernated = Sleepy1, - mixed_queues = dict:store(Pid, {Req, 0}, Mixed) - } - end; - false -> - State #state { mixed_queues = dict:store(Pid, {Req, 0}, Mixed) } + DupCheckSet1 = sets:add_element(Pid, DupCheckSet), + case find_queue(Pid, Mixed) of + {mixed, {Alloc, lowrate}} -> + tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed, + FreeAcc + Alloc, priority_queue:in + (Pid, Alloc, LazyAcc)); + _ -> + tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed, + FreeAcc, LazyAcc) + end + end + end. + +tidy_and_sum_sleepy(IgnorePid, Sleepy, Mixed) -> + tidy_and_sum_sleepy(sets:add_element(IgnorePid, sets:new()), + Sleepy, Mixed, 0, queue:new()). + +tidy_and_sum_sleepy(DupCheckSet, Sleepy, Mixed, FreeAcc, SleepyAcc) -> + case queue:out(Sleepy) of + {empty, Sleepy} -> {FreeAcc, SleepyAcc}; + {{value, Pid}, Sleepy1} -> + case sets:is_element(Pid, DupCheckSet) of + true -> + tidy_and_sum_sleepy(DupCheckSet, Sleepy1, Mixed, FreeAcc, + SleepyAcc); + false -> + DupCheckSet1 = sets:add_element(Pid, DupCheckSet), + case find_queue(Pid, Mixed) of + {mixed, {Alloc, hibernate}} -> + tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed, + FreeAcc + Alloc, queue:in + (Pid, SleepyAcc)); + _ -> tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed, + FreeAcc, SleepyAcc) + end + end + end. + +free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req) -> + free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req, + priority_queue:new()). + +free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req, LazyAcc) -> + case priority_queue:pout(Lazy) of + {empty, Lazy} -> {priority_queue:join(Lazy, LazyAcc), Mixed, Req}; + {{value, IgnorePid, Alloc}, Lazy1} -> + free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed, Req, + priority_queue:in(IgnorePid, Alloc, LazyAcc)); + {{value, Pid, Alloc}, Lazy1} -> + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, Args ++ [disk]), + Mixed1 = dict:erase(Pid, Mixed), + case Req > Alloc of + true -> free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed1, + Req - Alloc, LazyAcc); + false -> {priority_queue:join(Lazy1, LazyAcc), Mixed1, + Req - Alloc} + end + end. + +free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req) -> + free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, queue:new()). + +free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, SleepyAcc) -> + case queue:out(Sleepy) of + {empty, Sleepy} -> {queue:join(Sleepy, SleepyAcc), Mixed, Req}; + {{value, IgnorePid}, Sleepy1} -> + free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed, Req, + queue:in(IgnorePid, SleepyAcc)); + {{value, Pid}, Sleepy1} -> + {Alloc, hibernate} = dict:fetch(Pid, Mixed), + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, Args ++ [disk]), + Mixed1 = dict:erase(Pid, Mixed), + case Req > Alloc of + true -> free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed1, + Req - Alloc, SleepyAcc); + false -> {queue:join(Sleepy1, SleepyAcc), Mixed1, Req - Alloc} + end end. -free_upto(Req, List, State) -> - free_upto(Req, List, 0, 0, State). - -free_upto(_Req, [], Freed, EFreed, State) -> - {[], Freed, EFreed, State}; -free_upto(Req, [Pid|Pids], Freed, EFreed, State = #state { available_tokens = Avail, - mixed_queues = Mixed }) -> - {mixed, {Alloc, EAlloc}} = find_queue(Pid, State), - rabbit_amqqueue:set_mode(Pid, disk), - State1 = State #state { mixed_queues = dict:erase(Pid, Mixed) }, - case Req > Avail + Freed + Alloc of +free_upto(Pid, Req, State = #state { available_tokens = Avail, + mixed_queues = Mixed, + callbacks = Callbacks, + lowrate = Lazy, + hibernate = Sleepy }) -> + case Req > Avail of true -> - free_upto(Req, Pids, Freed + Alloc, EFreed + EAlloc, State1); - false -> - {Pids, Freed + Alloc, EFreed + EAlloc, State1} + {SleepySum, Sleepy1} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed), + case Req > Avail + SleepySum of + true -> %% not enough in sleepy, have a look in lazy too + {LazySum, Lazy1} = tidy_and_sum_lazy(Pid, Lazy, Mixed), + case Req > Avail + SleepySum + LazySum of + true -> %% can't free enough, just return tidied state + State #state { lowrate = Lazy1, + hibernate = Sleepy1 }; + false -> %% need to free all of sleepy, and some of lazy + {Sleepy2, Mixed1, ReqRem} = + free_upto_sleepy + (Pid, Callbacks, Sleepy1, Mixed, Req), + {Lazy2, Mixed2, ReqRem1} = + free_upto_lazy(Pid, Callbacks, Lazy1, Mixed1, + ReqRem), + State #state { available_tokens = + Avail + (Req - ReqRem1), + mixed_queues = Mixed2, + lowrate = Lazy2, + hibernate = Sleepy2 } + end; + false -> %% enough available in sleepy, don't touch lazy + {Sleepy2, Mixed1, ReqRem} = + free_upto_sleepy(Pid, Callbacks, Sleepy1, Mixed, Req), + State #state { available_tokens = Avail + (Req - ReqRem), + mixed_queues = Mixed1, + hibernate = Sleepy2 } + end; + false -> State end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b56d71c8c2..f108285056 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -65,7 +65,7 @@ test_priority_queue() -> %% empty Q Q = priority_queue:new(), - {true, true, 0, [], []} = test_priority_queue(Q), + {true, true, 0, [], [], []} = test_priority_queue(Q), %% 1-4 element no-priority Q true = lists:all(fun (X) -> X =:= passed end, @@ -74,21 +74,57 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo], [{foo, 1}]} = test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} = test_priority_queue(Q2), %% 2-element different-priority Q Q3 = priority_queue:in(bar, 2, Q1), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} = test_priority_queue(Q3), %% 1-element negative priority Q Q4 = priority_queue:in(foo, -1, priority_queue:new()), - {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + {true, false, 1, [{-1, foo}], [foo], [{foo, -1}]} = test_priority_queue(Q4), + + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [{foo, 0}, {bar, 0}]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [{bar, 1}, {foo, 0}]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [{foo, 1}, {bar, 0}]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} = + test_priority_queue(Q10), passed. @@ -101,18 +137,26 @@ priority_queue_out_all(Q) -> {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] end. +priority_queue_pout_all(Q) -> + case priority_queue:pout(Q) of + {empty, _} -> []; + {{value, V, P}, Q1} -> [{V, P} | priority_queue_pout_all(Q1)] + end. + test_priority_queue(Q) -> {priority_queue:is_queue(Q), priority_queue:is_empty(Q), priority_queue:len(Q), priority_queue:to_list(Q), - priority_queue_out_all(Q)}. + priority_queue_out_all(Q), + priority_queue_pout_all(Q)}. test_simple_n_element_queue(N) -> Items = lists:seq(1, N), Q = priority_queue_in_all(priority_queue:new(), Items), ToListRes = [{0, X} || X <- Items], - {true, false, N, ToListRes, Items} = test_priority_queue(Q), + POutAllRes = [{X, 0} || X <- Items], + {true, false, N, ToListRes, Items, POutAllRes} = test_priority_queue(Q), passed. test_parsing() -> |
