diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-04 14:43:27 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-04 14:43:27 +0100 |
| commit | 8e06d03debb2d39449ad5e2b90599bb3b5a6e4e4 (patch) | |
| tree | de49033798f4bc1bc58ae6a19086179a6affd460 /src | |
| parent | 587e3cf8aa4c4308662b957b1a374a468402f70c (diff) | |
| parent | 234c2cd961f00ed0c2334a219ae098929cb25cb3 (diff) | |
| download | rabbitmq-server-git-8e06d03debb2d39449ad5e2b90599bb3b5a6e4e4.tar.gz | |
Merging default into bug24455
Diffstat (limited to 'src')
| -rw-r--r-- | src/bpqueue.erl | 271 | ||||
| -rw-r--r-- | src/lqueue.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 140 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 419 |
4 files changed, 271 insertions, 648 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl deleted file mode 100644 index 71a34262eb..0000000000 --- a/src/bpqueue.erl +++ /dev/null @@ -1,271 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -%% - --module(bpqueue). - -%% Block-prefixed queue. From the perspective of the queue interface -%% the datastructure acts like a regular queue where each value is -%% paired with the prefix. -%% -%% This is implemented as a queue of queues, which is more space and -%% time efficient, whilst supporting the normal queue interface. Each -%% inner queue has a prefix, which does not need to be unique, and it -%% is guaranteed that no two consecutive blocks have the same -%% prefix. len/1 returns the flattened length of the queue and is -%% O(1). - --export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1, map_fold_filter_l/4, - map_fold_filter_r/4]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --export_type([bpqueue/0]). - --type(bpqueue() :: {non_neg_integer(), queue()}). --type(prefix() :: any()). --type(value() :: any()). --type(result() :: ({'empty', bpqueue()} | - {{'value', prefix(), value()}, bpqueue()})). - --spec(new/0 :: () -> bpqueue()). --spec(is_empty/1 :: (bpqueue()) -> boolean()). --spec(len/1 :: (bpqueue()) -> non_neg_integer()). --spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). --spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). --spec(out/1 :: (bpqueue()) -> result()). --spec(out_r/1 :: (bpqueue()) -> result()). --spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()). --spec(foldl/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). --spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). --spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()). --spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). --spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())), - (fun ((value(), B) -> - ({prefix(), value(), B} | 'stop'))), - B, - bpqueue()) -> - {bpqueue(), B}). --spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())), - (fun ((value(), B) -> - ({prefix(), value(), B} | 'stop'))), - B, - bpqueue()) -> - {bpqueue(), B}). - --endif. - -%%---------------------------------------------------------------------------- - -new() -> {0, queue:new()}. - -is_empty({0, _Q}) -> true; -is_empty(_BPQ) -> false. - -len({N, _Q}) -> N. - -in(Prefix, Value, {0, Q}) -> - {1, queue:in({Prefix, queue:from_list([Value])}, Q)}; -in(Prefix, Value, BPQ) -> - in1({fun queue:in/2, fun queue:out_r/1}, Prefix, Value, BPQ). - -in_r(Prefix, Value, BPQ = {0, _Q}) -> - in(Prefix, Value, BPQ); -in_r(Prefix, Value, BPQ) -> - in1({fun queue:in_r/2, fun queue:out/1}, Prefix, Value, BPQ). - -in1({In, Out}, Prefix, Value, {N, Q}) -> - {N+1, case Out(Q) of - {{value, {Prefix, InnerQ}}, Q1} -> - In({Prefix, In(Value, InnerQ)}, Q1); - {{value, {_Prefix, _InnerQ}}, _Q1} -> - In({Prefix, queue:in(Value, queue:new())}, Q) - end}. - -in_q(Prefix, Queue, BPQ = {0, Q}) -> - case queue:len(Queue) of - 0 -> BPQ; - N -> {N, queue:in({Prefix, Queue}, Q)} - end; -in_q(Prefix, Queue, BPQ) -> - in_q1({fun queue:in/2, fun queue:out_r/1, - fun queue:join/2}, - Prefix, Queue, BPQ). - -in_q_r(Prefix, Queue, BPQ = {0, _Q}) -> - in_q(Prefix, Queue, BPQ); -in_q_r(Prefix, Queue, BPQ) -> - in_q1({fun queue:in_r/2, fun queue:out/1, - fun (T, H) -> queue:join(H, T) end}, - Prefix, Queue, BPQ). - -in_q1({In, Out, Join}, Prefix, Queue, BPQ = {N, Q}) -> - case queue:len(Queue) of - 0 -> BPQ; - M -> {N + M, case Out(Q) of - {{value, {Prefix, InnerQ}}, Q1} -> - In({Prefix, Join(InnerQ, Queue)}, Q1); - {{value, {_Prefix, _InnerQ}}, _Q1} -> - In({Prefix, Queue}, Q) - end} - end. - -out({0, _Q} = BPQ) -> {empty, BPQ}; -out(BPQ) -> out1({fun queue:in_r/2, fun queue:out/1}, BPQ). - -out_r({0, _Q} = BPQ) -> {empty, BPQ}; -out_r(BPQ) -> out1({fun queue:in/2, fun queue:out_r/1}, BPQ). - -out1({In, Out}, {N, Q}) -> - {{value, {Prefix, InnerQ}}, Q1} = Out(Q), - {{value, Value}, InnerQ1} = Out(InnerQ), - Q2 = case queue:is_empty(InnerQ1) of - true -> Q1; - false -> In({Prefix, InnerQ1}, Q1) - end, - {{value, Prefix, Value}, {N-1, Q2}}. - -join({0, _Q}, BPQ) -> - BPQ; -join(BPQ, {0, _Q}) -> - BPQ; -join({NHead, QHead}, {NTail, QTail}) -> - {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead), - {NHead + NTail, - case queue:out(QTail) of - {{value, {Prefix, InnerQTail}}, QTail1} -> - queue:join( - queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1), - QTail1); - {{value, {_Prefix, _InnerQTail}}, _QTail1} -> - queue:join(QHead, QTail) - end}. - -foldl(_Fun, Init, {0, _Q}) -> Init; -foldl( Fun, Init, {_N, Q}) -> fold1(fun queue:out/1, Fun, Init, Q). - -foldr(_Fun, Init, {0, _Q}) -> Init; -foldr( Fun, Init, {_N, Q}) -> fold1(fun queue:out_r/1, Fun, Init, Q). - -fold1(Out, Fun, Init, Q) -> - case Out(Q) of - {empty, _Q} -> - Init; - {{value, {Prefix, InnerQ}}, Q1} -> - fold1(Out, Fun, fold1(Out, Fun, Prefix, Init, InnerQ), Q1) - end. - -fold1(Out, Fun, Prefix, Init, InnerQ) -> - case Out(InnerQ) of - {empty, _Q} -> - Init; - {{value, Value}, InnerQ1} -> - fold1(Out, Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1) - end. - -from_list(List) -> - {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} = - lists:foldl( - fun ({_Prefix, []}, Acc) -> - Acc; - ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> - {Prefix, queue:join(InnerQ, queue:from_list(InnerList)), - ListOfPQs, LenAcc + length(InnerList)}; - ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> - {Prefix1, queue:from_list(InnerList), - [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)} - end, {undefined, queue:new(), [], 0}, List), - ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1], - [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2), - {Len, queue:from_list(case queue:is_empty(InnerQ1) of - true -> Rest; - false -> All - end)}. - -to_list({0, _Q}) -> []; -to_list({_N, Q}) -> [{Prefix, queue:to_list(InnerQ)} || - {Prefix, InnerQ} <- queue:to_list(Q)]. - -%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init} -%% where FilterFun(Prefix) -> boolean() -%% Fun(Value, Init) -> {Prefix, Value, Init} | stop -%% -%% The filter fun allows you to skip very quickly over blocks that -%% you're not interested in. Such blocks appear in the resulting bpq -%% without modification. The Fun is then used both to map the value, -%% which also allows you to change the prefix (and thus block) of the -%% value, and also to modify the Init/Acc (just like a fold). If the -%% Fun returns 'stop' then it is not applied to any further items. -map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> - {BPQ, Init}; -map_fold_filter_l(PFilter, Fun, Init, {N, Q}) -> - map_fold_filter1({fun queue:out/1, fun queue:in/2, - fun in_q/3, fun join/2}, - N, PFilter, Fun, Init, Q, new()). - -map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> - {BPQ, Init}; -map_fold_filter_r(PFilter, Fun, Init, {N, Q}) -> - map_fold_filter1({fun queue:out_r/1, fun queue:in_r/2, - fun in_q_r/3, fun (T, H) -> join(H, T) end}, - N, PFilter, Fun, Init, Q, new()). - -map_fold_filter1(Funs = {Out, _In, InQ, Join}, Len, PFilter, Fun, - Init, Q, QNew) -> - case Out(Q) of - {empty, _Q} -> - {QNew, Init}; - {{value, {Prefix, InnerQ}}, Q1} -> - case PFilter(Prefix) of - true -> - {Init1, QNew1, Cont} = - map_fold_filter2(Funs, Fun, Prefix, Prefix, - Init, InnerQ, QNew, queue:new()), - case Cont of - false -> {Join(QNew1, {Len - len(QNew1), Q1}), Init1}; - true -> map_fold_filter1(Funs, Len, PFilter, Fun, - Init1, Q1, QNew1) - end; - false -> - map_fold_filter1(Funs, Len, PFilter, Fun, - Init, Q1, InQ(Prefix, InnerQ, QNew)) - end - end. - -map_fold_filter2(Funs = {Out, In, InQ, _Join}, Fun, OrigPrefix, Prefix, - Init, InnerQ, QNew, InnerQNew) -> - case Out(InnerQ) of - {empty, _Q} -> - {Init, InQ(OrigPrefix, InnerQ, - InQ(Prefix, InnerQNew, QNew)), true}; - {{value, Value}, InnerQ1} -> - case Fun(Value, Init) of - stop -> - {Init, InQ(OrigPrefix, InnerQ, - InQ(Prefix, InnerQNew, QNew)), false}; - {Prefix1, Value1, Init1} -> - {Prefix2, QNew1, InnerQNew1} = - case Prefix1 =:= Prefix of - true -> {Prefix, QNew, In(Value1, InnerQNew)}; - false -> {Prefix1, InQ(Prefix, InnerQNew, QNew), - In(Value1, queue:new())} - end, - map_fold_filter2(Funs, Fun, OrigPrefix, Prefix2, - Init1, InnerQ1, QNew1, InnerQNew1) - end - end. diff --git a/src/lqueue.erl b/src/lqueue.erl new file mode 100644 index 0000000000..4a8164f6db --- /dev/null +++ b/src/lqueue.erl @@ -0,0 +1,89 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% + +-module(lqueue). + +-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, + foldl/3, foldr/3, from_list/1, to_list/1]). + +-define(QUEUE, queue). + +-ifdef(use_specs). + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: {non_neg_integer(), ?MODULE()}). +-type(value() :: any()). +-type(result() :: ({'empty', ?MODULE()} | + {{'value', value()}, ?MODULE()})). + +-spec(new/0 :: () -> ?MODULE()). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). +-spec(len/1 :: (?MODULE()) -> non_neg_integer()). +-spec(in/2 :: (value(), ?MODULE()) -> ?MODULE()). +-spec(in_r/2 :: (value(), ?MODULE()) -> ?MODULE()). +-spec(out/1 :: (?MODULE()) -> result()). +-spec(out_r/1 :: (?MODULE()) -> result()). +-spec(join/2 :: (?MODULE(), ?MODULE()) -> ?MODULE()). +-spec(foldl/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B). +-spec(foldr/3 :: (fun ((value(), B) -> B), B, ?MODULE()) -> B). +-spec(from_list/1 :: ([value()]) -> ?MODULE()). +-spec(to_list/1 :: (?MODULE()) -> [value()]). + +-endif. + +new() -> {0, ?QUEUE:new()}. + +is_empty({0, _Q}) -> true; +is_empty(_) -> false. + +in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}. + +in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}. + +out({0, _Q} = Q) -> + {empty, Q}; +out({L, Q}) -> + {Result, Q1} = ?QUEUE:out(Q), + {Result, {L-1, Q1}}. + +out_r({0, _Q} = Q) -> + {empty, Q}; +out_r({L, Q}) -> + {Result, Q1} = ?QUEUE:out_r(Q), + {Result, {L-1, Q1}}. + +join({L1, Q1}, {L2, Q2}) -> + {L1 + L2, ?QUEUE:join(Q1, Q2)}. + +to_list({_L, Q}) -> ?QUEUE:to_list(Q). + +from_list(L) -> {length(L), ?QUEUE:from_list(L)}. + +foldl(Fun, Init, Q) -> + case out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1) + end. + +foldr(Fun, Init, Q) -> + case out_r(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) + end. + +len({L, _Q}) -> + L. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 39f67ced2d..eb456e8cf9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -44,7 +44,6 @@ all_tests() -> passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), - passed = test_bpqueue(), passed = test_pg_local(), passed = test_unfold(), passed = test_supervisor_delayed_restart(), @@ -262,143 +261,6 @@ test_priority_queue(Q) -> priority_queue:to_list(Q), priority_queue_out_all(Q)}. -test_bpqueue() -> - Q = bpqueue:new(), - true = bpqueue:is_empty(Q), - 0 = bpqueue:len(Q), - [] = bpqueue:to_list(Q), - - Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1, - fun bpqueue:to_list/1, - fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4), - Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1, - fun (QR) -> lists:reverse( - [{P, lists:reverse(L)} || - {P, L} <- bpqueue:to_list(QR)]) - end, - fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4), - - [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)), - [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)), - [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] = - bpqueue:to_list(bpqueue:join(Q1, Q2)), - - [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] = - bpqueue:to_list(bpqueue:join(Q1, Q1)), - - [{foo, [1, 2]}, {bar, [3]}] = - bpqueue:to_list( - bpqueue:from_list( - [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])), - - [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])), - - {4, [a,b,c,d]} = - bpqueue:foldl( - fun (Prefix, Value, {Prefix, Acc}) -> - {Prefix + 1, [Value | Acc]} - end, - {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])), - - [{bar,3}, {foo,2}, {foo,1}] = - bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2), - - BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], - BPQ = bpqueue:from_list(BPQL), - - %% no effect - {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ), - {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ), - {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ), - {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ), - - %% process 1 item - {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} = - bpqueue_mffl([foo,bar], {foo, [2]}, BPQ), - {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} = - bpqueue_mffl([bar], {bar, [4]}, BPQ), - {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} = - bpqueue_mffr([foo,bar], {foo, [6]}, BPQ), - {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} = - bpqueue_mffr([bar], {baz, [4]}, BPQ), - - %% change prefix - {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} = - bpqueue_mffl([foo,bar], {bar, []}, BPQ), - {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} = - bpqueue_mffl([foo], {bar, [5]}, BPQ), - {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} = - bpqueue_mffl([foo], {bar, [7]}, BPQ), - {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} = - bpqueue_mffl([bar], {foo, [5]}, BPQ), - {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} = - bpqueue_mffl([foo], {bar, []}, BPQ), - {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} = - bpqueue_mffl([bar], {foo, []}, BPQ), - - %% edge cases - {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} = - bpqueue_mffl([foo], {foo, [5]}, BPQ), - {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} = - bpqueue_mffr([foo], {foo, [2]}, BPQ), - - passed. - -bpqueue_test(In, Out, List, Fold, MapFoldFilter) -> - Q = bpqueue:new(), - {empty, _Q} = Out(Q), - - ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q), - {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end, - fun(_V, _N) -> throw(explosion) end, 0, Q), - [] = bpqueue:to_list(Q1M), - - Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))), - false = bpqueue:is_empty(Q1), - 3 = bpqueue:len(Q1), - [{foo, [1, 2]}, {bar, [3]}] = List(Q1), - - {{value, foo, 1}, Q3} = Out(Q1), - {{value, foo, 2}, Q4} = Out(Q3), - {{value, bar, 3}, _Q5} = Out(Q4), - - F = fun (QN) -> - MapFoldFilter(fun (foo) -> true; - (_) -> false - end, - fun (2, _Num) -> stop; - (V, Num) -> {bar, -V, V - Num} end, - 0, QN) - end, - {Q6, 0} = F(Q), - [] = bpqueue:to_list(Q6), - {Q7, 1} = F(Q1), - [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7), - - Q1. - -bpqueue_mffl(FF1A, FF2A, BPQ) -> - bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ). - -bpqueue_mffr(FF1A, FF2A, BPQ) -> - bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ). - -bpqueue_mff(Fold, FF1A, FF2A, BPQ) -> - FF1 = fun (Prefixes) -> - fun (P) -> lists:member(P, Prefixes) end - end, - FF2 = fun ({Prefix, Stoppers}) -> - fun (Val, Num) -> - case lists:member(Val, Stoppers) of - true -> stop; - false -> {Prefix, -Val, 1 + Num} - end - end - end, - Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end, - - Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)). - test_simple_n_element_queue(N) -> Items = lists:seq(1, N), Q = priority_queue_in_all(priority_queue:new(), Items), @@ -2312,7 +2174,7 @@ wait_for_confirms(Unconfirmed) -> test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, - fun test_variable_queue_partial_segments_delta_thing/1, + %%fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_dropwhile/1, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 94c0913dfc..45c6a17f69 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -280,8 +280,6 @@ end_seq_id %% end_seq_id is exclusive }). --record(merge_funs, {new, join, out, in, publish}). - %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -291,6 +289,7 @@ -define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(QUEUE, lqueue). -include("rabbit.hrl"). @@ -315,11 +314,11 @@ end_seq_id :: non_neg_integer() }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue:bpqueue(), + q1 :: ?QUEUE:?QUEUE(), + q2 :: ?QUEUE:?QUEUE(), delta :: delta(), - q3 :: bpqueue:bpqueue(), - q4 :: queue(), + q3 :: ?QUEUE:?QUEUE(), + q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), pending_ack :: gb_tree(), ram_ack_index :: gb_tree(), @@ -475,19 +474,19 @@ purge(State = #vqstate { q4 = Q4, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q4, + fun ?QUEUE:foldl/3, Q4, orddict:new(), IndexState, MSCState), {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2, msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = queue:new(), + State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( - fun rabbit_misc:queue_fold/3, Q1, + fun ?QUEUE:foldl/3, Q1, LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, @@ -593,11 +592,11 @@ requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), - alpha_funs(), + fun publish_alpha/2, MsgPropsFun, State), {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, delta_limit(Delta), - beta_funs(), + fun publish_beta/2, MsgPropsFun, State1), {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, MsgPropsFun, State2), @@ -696,7 +695,6 @@ needs_timeout(State) -> false -> case reduce_memory_use( fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, fun (_Quota, State1) -> {0, State1} end, State) of {true, _State} -> idle; @@ -725,11 +723,11 @@ status(#vqstate { avg_ingress = AvgIngressRate }, ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, + [ {q1 , ?QUEUE:len(Q1)}, + {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, + {q3 , ?QUEUE:len(Q3)}, + {q4 , ?QUEUE:len(Q4)}, {len , Len}, {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, @@ -758,11 +756,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, persistent_count = PersistentCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }) -> - E1 = queue:is_empty(Q1), - E2 = bpqueue:is_empty(Q2), + E1 = ?QUEUE:is_empty(Q1), + E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, - E3 = bpqueue:is_empty(Q3), - E4 = queue:is_empty(Q4), + E3 = ?QUEUE:is_empty(Q3), + E4 = ?QUEUE:is_empty(Q4), LZ = Len == 0, true = E1 or not E3, @@ -869,25 +867,26 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; false -> case gb_trees:is_defined(SeqId, PA) of - false -> {[m(#msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }) | Filtered1], - Delivers1, - Acks1}; - true -> Acc + false -> + {?QUEUE:in_r( + m(#msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps + }), Filtered1), + Delivers1, Acks1}; + true -> + Acc end end - end, {[], [], []}, List), - {bpqueue:from_list([{true, Filtered}]), - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState))}. + end, {?QUEUE:new(), [], []}, List), + {Filtered, rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. %% the first arg is the older delta combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> @@ -915,8 +914,18 @@ combine_deltas(#delta { start_seq_id = StartLow, andalso ((StartLow + Count) =< EndHigh), #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. -beta_fold(Fun, Init, Q) -> - bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). +expand_delta(SeqId, Delta) -> + DeltaInc = #delta { start_seq_id = SeqId, + count = 1, + end_seq_id = SeqId + 1 }, + case Delta of + ?BLANK_DELTA -> + DeltaInc; + #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId -> + combine_deltas(DeltaInc, Delta); + #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId -> + combine_deltas(Delta, DeltaInc) + end. update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -939,11 +948,11 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), + q1 = ?QUEUE:new(), + q2 = ?QUEUE:new(), delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), + q3 = ?QUEUE:new(), + q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty(), @@ -983,19 +992,19 @@ blank_rate(Timestamp, IngressLength) -> in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - case queue:is_empty(Q4) of + case ?QUEUE:is_empty(Q4) of true -> State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + q3 = ?QUEUE:in_r(MsgStatus, Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. queue_out(State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of + case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; @@ -1073,15 +1082,15 @@ purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> - case bpqueue:is_empty(Q3) of + case ?QUEUE:is_empty(Q3) of true -> {LensByStore, State}; false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun beta_fold/3, Q3, + remove_queue_entries(fun ?QUEUE:foldl/3, Q3, LensByStore, IndexState, MSCState), purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { - q3 = bpqueue:new(), + q3 = ?QUEUE:new(), index_state = IndexState1 })) end. @@ -1132,9 +1141,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case bpqueue:is_empty(Q3) of - false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + State2 = case ?QUEUE:is_empty(Q3) of + false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), @@ -1327,85 +1336,50 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %% Internal plumbing for requeue %%---------------------------------------------------------------------------- -alpha_funs() -> - #merge_funs { - new = fun queue:new/0, - join = fun queue:join/2, - out = fun queue:out/1, - in = fun queue:in/2, - publish = fun (#msg_status { msg = undefined } = MsgStatus, State) -> - read_msg(MsgStatus, State); - (MsgStatus, #vqstate { - ram_msg_count = RamMsgCount } = State) -> - {MsgStatus, State #vqstate { - ram_msg_count = RamMsgCount + 1 }} - end}. - -beta_funs() -> - #merge_funs { - new = fun bpqueue:new/0, - join = fun bpqueue:join/2, - out = fun (Q) -> - case bpqueue:out(Q) of - {{value, _IndexOnDisk, MsgStatus}, Q1} -> - {{value, MsgStatus}, Q1}; - {empty, _Q1} = X -> - X - end - end, - in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> - bpqueue:in(IOD, MsgStatus, Q) - end, - publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, - State) -> - {#msg_status { index_on_disk = IndexOnDisk, - msg = Msg} = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = - State1} = - maybe_write_to_disk(not MsgOnDisk, false, - MsgStatus, State), - {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + - one_if(Msg =/= undefined), - ram_index_count = RamIndexCount + - one_if(not IndexOnDisk) }} - end}. +publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> + read_msg(MsgStatus, State); +publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> + {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. + +publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) -> + {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1, + #vqstate { ram_index_count = RamIndexCount, + ram_msg_count = RamMsgCount } = State1} = + maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), + {MsgStatus1, State1 #vqstate { + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}. %% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, - MsgPropsFun, State) -> - queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State). +queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) -> + queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, + Limit, PubFun, MsgPropsFun, State). -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, - #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, - MsgPropsFun, State) +queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, + Limit, PubFun, MsgPropsFun, State) when Limit == undefined orelse SeqId < Limit -> - case QOut(Q) of + case ?QUEUE:out(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, - Limit, Funs, MsgPropsFun, State); + queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, + Limit, PubFun, MsgPropsFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State), {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - QPublish(MsgStatus, State1), - queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds], - Limit, Funs, MsgPropsFun, State2) + PubFun(MsgStatus, State1), + queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], + Limit, PubFun, MsgPropsFun, State2) end; -queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, - _MsgPropsFun, State) -> - {SeqIds, QJoin(Front, Q), MsgIds, State}. +queue_merge(SeqIds, Q, Front, MsgIds, + _Limit, _PubFun, _MsgPropsFun, State) -> + {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> {Delta, MsgIds, State}; -delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, - count = Count, - end_seq_id = EndSeqId} = Delta, - MsgIds, MsgPropsFun, State) -> +delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> {#msg_status { msg_id = MsgId, index_on_disk = IndexOnDisk, @@ -1415,11 +1389,7 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, {_MsgStatus, State2} = maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, MsgStatus, State1), - {Delta0 #delta { - start_seq_id = lists:min([SeqId, StartSeqId]), - count = Count + 1, - end_seq_id = lists:max([SeqId + 1, EndSeqId]) }, - [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1430,13 +1400,13 @@ msg_from_pending_ack(SeqId, MsgPropsFun, State) -> msg_props = (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } }, State1}. -beta_limit(BPQ) -> - case bpqueue:out(BPQ) of - {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId; - {empty, _BPQ} -> undefined +beta_limit(Q) -> + case ?QUEUE:out(Q) of + {{value, #msg_status { seq_id = SeqId }}, _Q} -> SeqId; + {empty, _Q} -> undefined end. -delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; +delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %%---------------------------------------------------------------------------- @@ -1462,10 +1432,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, +reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, State = #vqstate {target_ram_count = infinity}) -> {false, State}; -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, +reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, @@ -1497,13 +1467,10 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, {true, State2} end, - case State1 #vqstate.target_ram_count of - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} - end + case chunk_size(State1 #vqstate.ram_index_count, + permitted_beta_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; + _ -> {Reduce, State1} end. limit_ram_acks(0, State) -> @@ -1525,54 +1492,27 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI1 }) end. - reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, - fun limit_ram_index/2, - fun push_betas_to_deltas/1, + fun push_betas_to_deltas/2, fun limit_ram_acks/2, State), State1. -limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, - index_state = IndexState, - ram_index_count = RamIndexCount }) -> - {Q2a, {Quota1, IndexState1}} = limit_ram_index( - fun bpqueue:map_fold_filter_r/4, - Q2, {Quota, IndexState}), - %% TODO: we shouldn't be writing index entries for messages that - %% can never end up in delta due them residing in the only segment - %% held by q3. - {Q3a, {Quota2, IndexState2}} = limit_ram_index( - fun bpqueue:map_fold_filter_r/4, - Q3, {Quota1, IndexState1}), - State #vqstate { q2 = Q2a, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount - (Quota - Quota2) }. - -limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> - {Q, {0, IndexState}}; -limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> - MapFoldFilterFun( - fun erlang:'not'/1, - fun (MsgStatus, {0, _IndexStateN}) -> - false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - stop; - (MsgStatus, {N, IndexStateN}) when N > 0 -> - false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - {MsgStatus1, IndexStateN1} = - maybe_write_index_to_disk(true, MsgStatus, IndexStateN), - {true, m(MsgStatus1), {N-1, IndexStateN1}} - end, {Quota, IndexState}, Q). - -permitted_ram_index_count(#vqstate { len = 0 }) -> +permitted_beta_count(#vqstate { len = 0 }) -> infinity; -permitted_ram_index_count(#vqstate { len = Len, - q2 = Q2, - q3 = Q3, - delta = #delta { count = DeltaCount } }) -> - BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), - BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). +permitted_beta_count(#vqstate { len = Len, + q1 = Q1, + q3 = Q3, + q4 = Q4 }) -> + BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), + Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len), + case ?QUEUE:out(Q3) of + {empty, _Q3} -> Permitted; + {{value, #msg_status { seq_id = MinSeqId }}, _Q3} -> + lists:max([Permitted, rabbit_queue_index:next_segment_boundary( + MinSeqId) - MinSeqId]) + end. chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> @@ -1587,25 +1527,25 @@ fetch_from_q3(State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount}) -> - case bpqueue:out(Q3) of + case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, IndexOnDisk, MsgStatus}, Q3a} -> + {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} -> RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION State1 = State #vqstate { q3 = Q3a, ram_index_count = RamIndexCount1 }, State2 = - case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of + case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and we %% know q4 is empty otherwise we wouldn't be %% loading from q3. As such, we can just set %% q4 to Q1. - true = bpqueue:is_empty(Q2), %% ASSERTION - true = queue:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = queue:new(), + true = ?QUEUE:is_empty(Q2), %% ASSERTION + true = ?QUEUE:is_empty(Q4), %% ASSERTION + State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); @@ -1638,7 +1578,7 @@ maybe_deltas_to_betas(State = #vqstate { {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, - case bpqueue:len(Q3a) of + case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold @@ -1646,14 +1586,14 @@ maybe_deltas_to_betas(State = #vqstate { State1 #vqstate { delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); Q3aLen -> - Q3b = bpqueue:join(Q3, Q3a), + Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { q2 = bpqueue:new(), + State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, - q3 = bpqueue:join(Q3b, Q2) }; + q3 = ?QUEUE:join(Q3b, Q2) }; N when N > 0 -> Delta1 = #delta { start_seq_id = DeltaSeqId1, count = N, @@ -1670,23 +1610,21 @@ push_alphas_to_betas(Quota, State) -> maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( - fun queue:out/1, - fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, - Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + fun ?QUEUE:out/1, + fun (MsgStatus, Q1a, + State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State1 #vqstate { q1 = Q1a, - q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) }; - (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, - Q1a, State1 = #vqstate { q2 = Q2 }) -> + q3 = ?QUEUE:in(MsgStatus, Q3) }; + (MsgStatus, Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, - q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } + q2 = ?QUEUE:in(MsgStatus, Q2) } end, Quota, Q1, State). maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( - fun queue:out_r/1, - fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, - Q4a, State1 = #vqstate { q3 = Q3 }) -> - State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + fun ?QUEUE:out_r/1, + fun (MsgStatus, Q4a, State1 = #vqstate { q3 = Q3 }) -> + State1 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } end, Quota, Q4, State). @@ -1716,65 +1654,70 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> Consumer(MsgStatus2, Qa, State2)) end. -push_betas_to_deltas(State = #vqstate { q2 = Q2, +push_betas_to_deltas(Quota, + State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState, ram_index_count = RamIndexCount }) -> - {Delta2, Q2a, RamIndexCount2, IndexState2} = - push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun bpqueue:out/1, Q2, - RamIndexCount, IndexState), - {Delta3, Q3a, RamIndexCount3, IndexState3} = - push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun bpqueue:out_r/1, Q3, - RamIndexCount2, IndexState2), - Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), + PushState = {Quota, Delta, RamIndexCount, IndexState}, + {Q2a, PushState1} = push_betas_to_deltas( + fun ?QUEUE:out/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q2, PushState), + {Q3a, PushState2} = push_betas_to_deltas( + fun ?QUEUE:out_r/1, + fun rabbit_queue_index:next_segment_boundary/1, + Q3, PushState1), + {_, Delta1, RamIndexCount1, IndexState1} = PushState2, State #vqstate { q2 = Q2a, - delta = Delta4, + delta = Delta1, q3 = Q3a, - index_state = IndexState3, - ram_index_count = RamIndexCount3 }. - -push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> - case bpqueue:out(Q) of - {empty, _Q} -> - {?BLANK_DELTA, Q, RamIndexCount, IndexState}; - {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} -> - {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} = - bpqueue:out_r(Q), + index_state = IndexState1, + ram_index_count = RamIndexCount1 }. + +push_betas_to_deltas(_Generator, _LimitFun, Q, + {0, _Delta, _RamIndexCount, _IndexState} = PushState) -> + {Q, PushState}; +push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> + case ?QUEUE:is_empty(Q) of + true -> + {Q, PushState}; + false -> + {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q), + {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of - true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; - false -> {Len, Qc, RamIndexCount1, IndexState1} = - push_betas_to_deltas(Generator, Limit, Q, 0, - RamIndexCount, IndexState), - {#delta { start_seq_id = Limit, - count = Len, - end_seq_id = MaxSeqId + 1 }, - Qc, RamIndexCount1, IndexState1} + true -> {Q, PushState}; + false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) end end. -push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, + {0, _Delta, _RamIndexCount, _IndexState} = PushState) -> + {Q, PushState}; +push_betas_to_deltas1(Generator, Limit, Q, + {Quota, Delta, RamIndexCount, IndexState} = PushState) -> case Generator(Q) of {empty, _Q} -> - {Count, Q, RamIndexCount, IndexState}; - {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} + {Q, PushState}; + {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {Count, Q, RamIndexCount, IndexState}; - {{value, IndexOnDisk, MsgStatus}, Qa} -> - {RamIndexCount1, IndexState1} = + {Q, PushState}; + {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, + seq_id = SeqId }}, Qa} -> + {Quota1, RamIndexCount1, IndexState1} = case IndexOnDisk of - true -> {RamIndexCount, IndexState}; + true -> {Quota, RamIndexCount, IndexState}; false -> {#msg_status { index_on_disk = true }, IndexState2} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - {RamIndexCount - 1, IndexState2} + {Quota - 1, RamIndexCount - 1, IndexState2} end, - push_betas_to_deltas( - Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) + Delta1 = expand_delta(SeqId, Delta), + push_betas_to_deltas1(Generator, Limit, Qa, + {Quota1, Delta1, RamIndexCount1, IndexState1}) end. %%---------------------------------------------------------------------------- |
