diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-01-13 16:56:18 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-01-13 16:56:18 +0000 |
| commit | b7c1a4b15f129bf920adfc59881ca9498034499f (patch) | |
| tree | f7b1d40df41278680402be1da21d7620a85b93e1 /src | |
| parent | 1e4ecb8f788103cb2d52cbf551e16a3fe0d829f2 (diff) | |
| download | rabbitmq-server-git-b7c1a4b15f129bf920adfc59881ca9498034499f.tar.gz | |
Refactoring of vq - pulled out the inlined block-prefix queue code and generally tidied profusely. Also efficiency fix in remove_queue_entries by avoiding an intermediate list (which could potentially be massive).
Diffstat (limited to 'src')
| -rw-r--r-- | src/bpqueue.erl | 185 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 330 |
3 files changed, 324 insertions, 200 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl new file mode 100644 index 0000000000..5e7471f71d --- /dev/null +++ b/src/bpqueue.erl @@ -0,0 +1,185 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(bpqueue). + +%% Block-prefixed queue. This implements a queue of queues, but +%% supporting the normal queue interface. Each block has a prefix and +%% it is guaranteed that no two consecutive blocks have the same +%% prefix. len/1 returns the flattened length of the queue and is O(1) + +-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2, + fold/3, from_list/1, to_list/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(bpqueue() :: {non_neg_integer(), queue()}). +-type(prefix() :: any()). +-type(value() :: any()). +-type(result() :: {'empty', bpqueue()} | + {{'value', prefix(), value()}, bpqueue()}). + +-spec(new/0 :: () -> bpqueue()). +-spec(is_empty/1 :: (bpqueue()) -> boolean()). +-spec(len/1 :: (bpqueue()) -> non_neg_integer()). +-spec(in/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). +-spec(in_r/3 :: (prefix(), value(), bpqueue()) -> bpqueue()). +-spec(out/1 :: (bpqueue()) -> result()). +-spec(out_r/1 :: (bpqueue()) -> result()). +-spec(join/2 :: (bpqueue(), bpqueue()) -> bpqueue()). +-spec(fold/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). +-spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()). +-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> + {0, queue:new()}. + +is_empty({0, _Q}) -> + true; +is_empty(_BPQ) -> + false. + +len({N, _Q}) -> + N. + +in(Prefix, Value, {0, Q}) -> + {1, queue:in({Prefix, queue:in(Value, Q)}, Q)}; +in(Prefix, Value, {N, Q}) -> + {N+1, + case queue:out_r(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + queue:in({Prefix, queue:in(Value, InnerQ)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + queue:in({Prefix, queue:in(Value, queue:new())}, Q) + end}. + +in_r(Prefix, Value, {0, Q}) -> + {1, queue:in({Prefix, queue:in(Value, Q)}, Q)}; +in_r(Prefix, Value, {N, Q}) -> + {N+1, + case queue:out(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + queue:in_r({Prefix, queue:in_r(Value, InnerQ)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + queue:in_r({Prefix, queue:in(Value, queue:new())}, Q) + end}. + +out({0, _Q} = BPQ) -> + {empty, BPQ}; +out({N, Q}) -> + {{value, {Prefix, InnerQ}}, Q1} = queue:out(Q), + {{value, Value}, InnerQ1} = queue:out(InnerQ), + Q2 = case queue:is_empty(InnerQ1) of + true -> Q1; + false -> queue:in_r({Prefix, InnerQ1}, Q1) + end, + {{value, Prefix, Value}, {N-1, Q2}}. + +out_r({0, _Q} = BPQ) -> + {empty, BPQ}; +out_r({N, Q}) -> + {{value, {Prefix, InnerQ}}, Q1} = queue:out_r(Q), + {{value, Value}, InnerQ1} = queue:out_r(InnerQ), + Q2 = case queue:is_empty(InnerQ1) of + true -> Q1; + false -> queue:in({Prefix, InnerQ1}, Q1) + end, + {{value, Prefix, Value}, {N-1, Q2}}. + +join({0, _Q}, BPQ) -> + BPQ; +join(BPQ, {0, _Q}) -> + BPQ; +join({NHead, QHead}, {NTail, QTail}) -> + {{value, {Prefix, InnerQHead}}, QHead1} = queue:out_r(QHead), + {NHead + NTail, + case queue:out(QTail) of + {{value, {Prefix, InnerQTail}}, QTail1} -> + queue:join( + queue:in({Prefix, queue:join(InnerQHead, InnerQTail)}, QHead1), + QTail1); + {{value, {_Prefix, _InnerQTail}}, _QTail1} -> + queue:join(QHead, QTail) + end}. + +fold(_Fun, Init, {0, _Q}) -> + Init; +fold(Fun, Init, {_N, Q}) -> + fold1(Fun, Init, Q). + +fold1(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> + Init; + {{value, {Prefix, InnerQ}}, Q1} -> + fold1(Fun, fold1(Fun, Prefix, Init, InnerQ), Q1) + end. + +fold1(Fun, Prefix, Init, InnerQ) -> + case queue:out(InnerQ) of + {empty, _Q} -> + Init; + {{value, Value}, InnerQ1} -> + fold1(Fun, Prefix, Fun(Prefix, Value, Init), InnerQ1) + end. + +from_list(List) -> + {FinalPrefix, FinalInnerQ, ListOfPQs1, Len} = + lists:foldl( + fun ({_Prefix, []}, Acc) -> + Acc; + ({Prefix, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> + {Prefix, queue:join(InnerQ, queue:from_list(InnerList)), + ListOfPQs, LenAcc + length(InnerList)}; + ({Prefix1, InnerList}, {Prefix, InnerQ, ListOfPQs, LenAcc}) -> + {Prefix1, queue:from_list(InnerList), + [{Prefix, InnerQ} | ListOfPQs], LenAcc + length(InnerList)} + end, {undefined, queue:new(), [], 0}, List), + ListOfPQs2 = [{FinalPrefix, FinalInnerQ} | ListOfPQs1], + [{undefined, InnerQ1} | Rest] = All = lists:reverse(ListOfPQs2), + {Len, queue:from_list(case queue:is_empty(InnerQ1) of + true -> Rest; + false -> All + end)}. + +to_list({0, _Q}) -> + []; +to_list({_N, Q}) -> + lists:map(fun to_list1/1, queue:to_list(Q)). + +to_list1({Prefix, InnerQ}) -> + {Prefix, queue:to_list(InnerQ)}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 23666a5f3d..2b5fe4c746 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,7 +55,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2, ceil/1]). +-export([unfold/2, ceil/1, queue_fold/3]). -import(mnesia). -import(lists). @@ -126,6 +126,7 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). +-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -endif. @@ -489,3 +490,9 @@ ceil(N) -> true -> T; false -> 1 + T end. + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6c7fad1212..8205f79f66 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -114,15 +114,16 @@ -ifdef(use_specs). +-type(bpqueue() :: any()). -type(msg_id() :: binary()). -type(seq_id() :: non_neg_integer()). -type(ack() :: {'ack_index_and_store', msg_id(), seq_id()} | 'ack_not_on_disk'). -type(vqstate() :: #vqstate { q1 :: queue(), - q2 :: {non_neg_integer(), queue()}, + q2 :: bpqueue(), delta :: delta(), - q3 :: {non_neg_integer(), queue()}, + q3 :: bpqueue(), q4 :: queue(), duration_target :: non_neg_integer(), target_ram_msg_count :: non_neg_integer(), @@ -196,9 +197,9 @@ init(QueueName) -> end, Now = now(), State = - #vqstate { q1 = queue:new(), q2 = {0, queue:new()}, + #vqstate { q1 = queue:new(), q2 = bpqueue:new(), delta = Delta, - q3 = {0, queue:new()}, q4 = queue:new(), + q3 = bpqueue:new(), q4 = queue:new(), duration_target = undefined, target_ram_msg_count = undefined, ram_msg_count = 0, @@ -371,7 +372,8 @@ is_empty(State) -> 0 == len(State). purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> - {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState), + {Q4Count, IndexState1} = + remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), @@ -495,8 +497,7 @@ flush_journal(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush_journal(IndexState) }. -status(#vqstate { q1 = Q1, q2 = {Q2Len, _Q2}, - delta = Delta, q3 = {Q3Len, _Q3}, q4 = Q4, +status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, @@ -505,9 +506,9 @@ status(#vqstate { q1 = Q1, q2 = {Q2Len, _Q2}, avg_ingress_rate = AvgIngressRate, next_seq_id = NextSeqId }) -> [ {q1, queue:len(Q1)}, - {q2, Q2Len}, + {q2, bpqueue:len(Q2)}, {delta, Delta}, - {q3, Q3Len}, + {q3, bpqueue:len(Q3)}, {q4, queue:len(Q4)}, {len, Len}, {outstanding_txns, length(From)}, @@ -532,44 +533,17 @@ persistent_msg_ids(Pubs) -> Obj #basic_message.is_persistent]. betas_from_segment_entries(List, SeqIdLimit) -> - List1 = [#msg_status { msg = undefined, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true - } - || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, - SeqId < SeqIdLimit ], - {length(List1), queue:from_list([{true, queue:from_list(List1)}])}. - -join_betas({HeadLen, Head}, {TailLen, Tail}) -> - {HeadLen + TailLen, join_betas1(Head, Tail)}. - -join_betas1(Head, Tail) -> - case {queue:out_r(Head), queue:out(Tail)} of - {{empty, _Head}, _} -> - Tail; - {_, {empty, _Tail}} -> - Head; - {{{value, {IndexOnDisk, InnerQHead}}, Head1}, - {{value, {IndexOnDisk, InnerQTail}}, Tail1}} -> - queue:join( - queue:in({IndexOnDisk, - queue:join(InnerQHead, InnerQTail)}, Head1), - Tail1); - {_, _} -> queue:join(Head, Tail) - end. - -grab_beta(Gen, Q) -> - case Gen(Q) of - {empty, _Q} -> - empty; - {{value, {_IndexOnDisk, InnerQ}}, _Q} -> - {{value, MsgStatus}, _InnerQ} = Gen(InnerQ), - MsgStatus - end. + bpqueue:from_list([{true, + [#msg_status { msg = undefined, + msg_id = MsgId, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + } + || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, + SeqId < SeqIdLimit ]}]). read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), @@ -596,6 +570,11 @@ combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow}, true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION #delta { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }. +beta_fold_no_index_on_disk(Fun, Init, Q) -> + bpqueue:fold(fun (_Prefix, Value, Acc) -> + Fun(Value, Acc) + end, Init, Q). + %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -609,50 +588,34 @@ delete1(NextSeqId, Count, DeltaSeqId, IndexState) -> {[], IndexState1} -> delete1(NextSeqId, Count, Delta1SeqId, IndexState1); {List, IndexState1} -> - {QCount, Q} = betas_from_segment_entries(List, Delta1SeqId), - {QCount, IndexState2} = remove_queue_entries(Q, IndexState1), + Q = betas_from_segment_entries(List, Delta1SeqId), + {QCount, IndexState2} = + remove_queue_entries(fun beta_fold_no_index_on_disk/3, + Q, IndexState1), delete1(NextSeqId, Count + QCount, Delta1SeqId, IndexState2) end. -purge1(Count, State = #vqstate { q3 = {Q3Len, Q3}, index_state = IndexState }) -> - case 0 == Q3Len of +purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> + case bpqueue:is_empty(Q3) of true -> {Q1Count, IndexState1} = - remove_queue_entries(State #vqstate.q1, IndexState), + remove_queue_entries(fun rabbit_misc:queue_fold/3, + State #vqstate.q1, IndexState), {Count + Q1Count, State #vqstate { q1 = queue:new(), index_state = IndexState1 }}; false -> - {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), + {Q3Count, IndexState1} = + remove_queue_entries(fun beta_fold_no_index_on_disk/3, + Q3, IndexState), purge1(Count + Q3Count, maybe_deltas_to_betas( State #vqstate { index_state = IndexState1, - q3 = {0, queue:new()} })) + q3 = bpqueue:new() })) end. -remove_queue_entries(Q, IndexState) -> +remove_queue_entries(Fold, Q, IndexState) -> {Count, MsgIds, SeqIds, IndexState1} = - lists:foldl( - fun (#msg_status { msg_id = MsgId, seq_id = SeqId, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> - MsgIdsAcc1 = case MsgOnDisk of - true -> [MsgId | MsgIdsAcc]; - false -> MsgIdsAcc - end, - SeqIdsAcc1 = case IndexOnDisk of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - IndexStateN1 = case IndexOnDisk andalso not IsDelivered of - true -> rabbit_queue_index:write_delivered( - SeqId, IndexStateN); - false -> IndexStateN - end, - {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1} - %% we need to write the delivered records in order otherwise - %% we upset the qi. So don't reverse. - end, {0, [], [], IndexState}, queue:to_list(Q)), + Fold(fun remove_queue_entries1/2, {0, [], [], IndexState}, Q), ok = case MsgIds of [] -> ok; _ -> rabbit_msg_store:remove(MsgIds) @@ -664,28 +627,40 @@ remove_queue_entries(Q, IndexState) -> end, {Count, IndexState2}. +remove_queue_entries1( + #msg_status { msg_id = MsgId, seq_id = SeqId, + is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + MsgIdsAcc1 = case MsgOnDisk of + true -> [MsgId | MsgIdsAcc]; + false -> MsgIdsAcc + end, + SeqIdsAcc1 = case IndexOnDisk of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, + IndexStateN1 = case IndexOnDisk andalso not IsDelivered of + true -> rabbit_queue_index:write_delivered( + SeqId, IndexStateN); + false -> IndexStateN + end, + {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}. + fetch_from_q3_or_delta(State = #vqstate { - q1 = Q1, q2 = {Q2Len, _Q2}, delta = #delta { count = DeltaCount }, - q3 = {Q3Len, Q3}, q4 = Q4, ram_msg_count = RamMsgCount, + q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, + q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, msg_store_read_state = MSCState }) -> - case queue:out(Q3) of + case bpqueue:out(Q3) of {empty, _Q3} -> 0 = DeltaCount, %% ASSERTION - 0 = Q2Len, %% ASSERTION - 0 = Q3Len, %% ASSERTION + true = bpqueue:is_empty(Q2), %% ASSERTION true = queue:is_empty(Q1), %% ASSERTION {empty, State}; - {{value, {IndexOnDisk, InnerQ}}, Q3a} -> - {{value, MsgStatus = #msg_status { - msg = undefined, msg_id = MsgId, - is_persistent = IsPersistent - }}, InnerQ1} = queue:out(InnerQ), - Q3LenB = Q3Len - 1, - Q3b = {Q3LenB, case queue:is_empty(InnerQ1) of - true -> Q3a; - false -> queue:in_r({IndexOnDisk, InnerQ1}, Q3a) - end}, + {{value, IndexOnDisk, MsgStatus = #msg_status { + msg = undefined, msg_id = MsgId, + is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }}, MSCState1} = rabbit_msg_store:read(MsgId, MSCState), @@ -695,17 +670,17 @@ fetch_from_q3_or_delta(State = #vqstate { false -> RamIndexCount - 1 end, true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3b, q4 = Q4a, + State1 = State #vqstate { q3 = Q3a, q4 = Q4a, ram_msg_count = RamMsgCount + 1, ram_index_count = RamIndexCount1, msg_store_read_state = MSCState1 }, State2 = - case {0 == Q3LenB, 0 == DeltaCount} of + case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and q1 %% can now be joined onto q4 - 0 = Q2Len, %% ASSERTION + true = bpqueue:is_empty(Q2), %% ASSERTION State1 #vqstate { q1 = queue:new(), q4 = queue:join(Q4a, Q1) }; {true, false} -> @@ -737,26 +712,26 @@ reduce_memory_use(State = test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, - q1 = Q1, q3 = {_Q3Len, Q3} }) -> + q1 = Q1, q3 = Q3 }) -> case TargetRamMsgCount of undefined -> msg; 0 -> - case queue:out(Q3) of + case bpqueue:out(Q3) of {empty, _Q3} -> %% if TargetRamMsgCount == 0, we know we have no %% alphas. If q3 is empty then delta must be empty %% too, so create a beta, which should end up in %% q3 index; - {{value, {_IndexOnDisk, InnerQ}}, _Q3a} -> - {{value, #msg_status { seq_id = OldSeqId }}, _InnerQ} = - queue:out(InnerQ), + {{value, _IndexOnDisk, #msg_status { seq_id = OldSeqId }}, + _Q3a} -> %% Don't look at the current delta as it may be %% empty. If the SeqId is still within the current %% segment, it'll be a beta, else it'll go into %% delta - case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of + case SeqId >= rabbit_queue_index:next_segment_boundary( + OldSeqId) of true -> neither; false -> index end @@ -817,13 +792,13 @@ publish(index, MsgStatus, State = store_beta_entry(MsgStatus2, State1); publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = - #vqstate { index_state = IndexState, q1 = Q1, q2 = {Q2Len, _Q2}, + #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, delta = Delta }) -> MsgStatus1 = #msg_status { msg_on_disk = true } = maybe_write_msg_to_disk(true, MsgStatus), {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus1, IndexState), - true = queue:is_empty(Q1) andalso 0 == Q2Len, %% ASSERTION + true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION %% delta may be empty, seq_id > next_segment_boundary from q3 %% head, so we need to find where the segment boundary is before %% or equal to seq_id @@ -835,39 +810,28 @@ publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = delta = combine_deltas(Delta, Delta1) }. store_alpha_entry(MsgStatus, State = - #vqstate { q1 = Q1, q2 = {Q2Len, _Q2}, + #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, - q3 = {Q3Len, _Q3}, q4 = Q4 }) -> - case 0 == Q2Len andalso 0 == DeltaCount andalso 0 == Q3Len of + q3 = Q3, q4 = Q4 }) -> + case bpqueue:is_empty(Q2) andalso 0 == DeltaCount andalso + bpqueue:is_empty(Q3) of true -> true = queue:is_empty(Q1), %% ASSERTION State #vqstate { q4 = queue:in(MsgStatus, Q4) }; false -> maybe_push_q1_to_betas( State #vqstate { q1 = queue:in(MsgStatus, Q1) }) end. -store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true }, - State = #vqstate { q2 = {Q2Len, Q2}, +store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, + index_on_disk = IndexOnDisk }, + State = #vqstate { q2 = Q2, delta = #delta { count = DeltaCount }, - q3 = {Q3Len, Q3} }) -> + q3 = Q3 }) -> MsgStatus1 = MsgStatus #msg_status { msg = undefined }, case DeltaCount == 0 of - true -> State #vqstate { q3 = {Q3Len + 1, - store_beta_entry1( - fun queue:out_r/1, fun queue:in/2, - MsgStatus1, Q3)} }; - false -> State #vqstate { q2 = {Q2Len + 1, - store_beta_entry1( - fun queue:out_r/1, fun queue:in/2, - MsgStatus1, Q2)} } - end. - -store_beta_entry1(Gen, Cons, MsgStatus = - #msg_status { index_on_disk = IndexOnDisk }, Q) -> - case Gen(Q) of - {{value, {IndexOnDisk, InnerQ}}, QTail} -> - Cons({IndexOnDisk, Cons(MsgStatus, InnerQ)}, QTail); - {_EmptyOrNotIndexOnDisk, _QTail} -> - Cons({IndexOnDisk, Cons(MsgStatus, queue:new())}, Q) + true -> + State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) }; + false -> + State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } end. maybe_write_msg_to_disk(_Force, MsgStatus = @@ -909,13 +873,12 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) -> State; maybe_deltas_to_betas( - State = #vqstate { index_state = IndexState, - q2 = Q2All, q3 = {Q3Len, _Q3} = Q3All, + State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, target_ram_msg_count = TargetRamMsgCount, delta = #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd }}) -> - case (0 < Q3Len) andalso (0 == TargetRamMsgCount) of + case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of true -> State; false -> @@ -927,19 +890,18 @@ maybe_deltas_to_betas( State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. But %% it can't be [] - Q3bAll = {Q3bLen, _Q3b} = - betas_from_segment_entries(List, DeltaSeqIdEnd), - Q3a = join_betas(Q3All, Q3bAll), - case DeltaCount - Q3bLen of + Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd), + Q3b = bpqueue:join(Q3, Q3a), + case DeltaCount - bpqueue:len(Q3a) of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 State1 #vqstate { delta = ?BLANK_DELTA, - q2 = {0, queue:new()}, - q3 = join_betas(Q3a, Q2All) }; + q2 = bpqueue:new(), + q3 = bpqueue:join(Q3b, Q2) }; N when N > 0 -> State1 #vqstate { - q3 = Q3a, + q3 = Q3b, delta = #delta { start_seq_id = Delta1SeqId, count = N, end_seq_id = DeltaSeqIdEnd } } @@ -957,13 +919,12 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( fun queue:out_r/1, - fun (MsgStatus, Q4a, State1 = #vqstate { q3 = {Q3Len, Q3} }) -> + fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, + Q4a, State1 = #vqstate { q3 = Q3 }) -> MsgStatus1 = MsgStatus #msg_status { msg = undefined }, %% these must go to q3 - State1 #vqstate { q3 = {Q3Len + 1, - store_beta_entry1( - fun queue:out/1, fun queue:in_r/2, - MsgStatus1, Q3)}, q4 = Q4a } + State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus1, Q3), + q4 = Q4a } end, Q4, State). maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = @@ -1000,43 +961,36 @@ maybe_push_alphas_to_betas( Consumer(MsgStatus2, Qa, State1)) end. -push_betas_to_deltas(State = #vqstate { q2 = {Q2Len, Q2}, delta = Delta, - q3 = {Q3Len, Q3}, +push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, ram_index_count = RamIndexCount, index_state = IndexState }) -> %% HighSeqId is high in the sense that it must be higher than the %% seq_id in Delta, but it's also the lowest of the betas that we %% transfer from q2 to delta. - {HighSeqId, Q2Len, Q2a, RamIndexCount1, IndexState1} = + {HighSeqId, Len1, Q2a, RamIndexCount1, IndexState1} = push_betas_to_deltas( - fun queue:out/1, - fun (IndexOnDisk, InnerQ, Q) -> - join_betas1(queue:from_list([{IndexOnDisk, InnerQ}]), Q) - end, undefined, Q2, RamIndexCount, IndexState), - true = queue:is_empty(Q2a), %% ASSERTION - EndSeqId = case queue:out_r(Q2) of - {empty, _Q2} -> - undefined; - {{value, {_IndexOnDisk, InnerQ}}, _Q2} -> - {{value, #msg_status { seq_id = EndSeqId1 }}, _InnerQ} = - queue:out_r(InnerQ), - EndSeqId1 + 1 - end, + fun bpqueue:out/1, undefined, Q2, RamIndexCount, IndexState), + true = bpqueue:is_empty(Q2a), %% ASSERTION + EndSeqId = + case bpqueue:out_r(Q2) of + {empty, _Q2} -> + undefined; + {{value, _IndexOnDisk, #msg_status { seq_id = EndSeqId1 }}, _Q2} -> + EndSeqId1 + 1 + end, Delta1 = #delta { start_seq_id = Delta1SeqId } = combine_deltas(Delta, #delta { start_seq_id = HighSeqId, - count = Q2Len, + count = Len1, end_seq_id = EndSeqId }), - State1 = State #vqstate { q2 = {0, Q2a}, delta = Delta1, + State1 = State #vqstate { q2 = bpqueue:new(), delta = Delta1, index_state = IndexState1, ram_index_count = RamIndexCount1 }, - case queue:out(Q3) of + case bpqueue:out(Q3) of {empty, _Q3} -> State1; - {{value, {_IndexOnDisk1, InnerQ1}}, _Q3} -> - {{value, #msg_status { seq_id = SeqId }}, _InnerQ1} = - queue:out(InnerQ1), - #msg_status { seq_id = SeqIdMax } = - grab_beta(fun queue:out_r/1, Q3), + {{value, _IndexOnDisk1, #msg_status { seq_id = SeqId }}, _Q3} -> + {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Q3a} = + bpqueue:out_r(Q3), Limit = rabbit_queue_index:next_segment_boundary(SeqId), %% ASSERTION true = Delta1SeqId == undefined orelse Delta1SeqId > SeqIdMax, @@ -1062,58 +1016,37 @@ push_betas_to_deltas(State = #vqstate { q2 = {Q2Len, Q2}, delta = Delta, %% But because we use queue:out_r, SeqIdMax is %% actually also the highest seq_id of the betas we %% transfer from q3 to deltas. - {SeqIdMax, Len2, Q3b, RamIndexCount2, IndexState2} = - push_betas_to_deltas( - fun queue:out_r/1, - fun (IndexOnDisk, InnerQ, Q) -> - join_betas1(Q, queue:from_list( - [{IndexOnDisk, InnerQ}])) - end, Limit, Q3, RamIndexCount1, IndexState1), + {SeqIdMax, Len2, Q3a, RamIndexCount2, IndexState2} = + push_betas_to_deltas(fun bpqueue:out_r/1, Limit, Q3, + RamIndexCount1, IndexState1), Delta2 = combine_deltas(#delta { start_seq_id = Limit, count = Len2, end_seq_id = SeqIdMax+1 }, Delta1), - State1 #vqstate { q3 = {Q3Len - Len2, Q3b}, delta = Delta2, + State1 #vqstate { q3 = Q3a, delta = Delta2, index_state = IndexState2, ram_index_count = RamIndexCount2 } end end. -push_betas_to_deltas( - Generator, Consumer, Limit, Q, RamIndexCount, IndexState) -> +push_betas_to_deltas(Generator, Limit, Q, RamIndexCount, IndexState) -> case Generator(Q) of {empty, Qa} -> {undefined, 0, Qa, RamIndexCount, IndexState}; - {{value, {IndexOnDisk, InnerQ}}, Qa} -> - {{value, #msg_status { seq_id = SeqId }}, _Qb} = Generator(InnerQ), + {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} -> {Count, Qb, RamIndexCount1, IndexState1} = push_betas_to_deltas( - Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, 0, - RamIndexCount, IndexState), + Generator, Limit, Q, 0, RamIndexCount, IndexState), {SeqId, Count, Qb, RamIndexCount1, IndexState1} end. -push_betas_to_deltas( - Generator, Consumer, Limit, Q, Count, RamIndexCount, IndexState) -> +push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of {empty, Qa} -> {Count, Qa, RamIndexCount, IndexState}; - {{value, {IndexOnDisk, InnerQ}}, Qa} -> - push_betas_to_deltas( - Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, Count, - RamIndexCount, IndexState) - end. - -push_betas_to_deltas(Generator, Consumer, Limit, IndexOnDisk, InnerQ, Q, - Count, RamIndexCount, IndexState) -> - case Generator(InnerQ) of - {empty, _InnerQ} -> - push_betas_to_deltas(Generator, Consumer, Limit, Q, Count, - RamIndexCount, IndexState); - {{value, #msg_status { seq_id = SeqId }}, _InnerQ} + {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} when Limit /= undefined andalso SeqId < Limit -> - {Count, Consumer(IndexOnDisk, InnerQ, Q), RamIndexCount, - IndexState}; - {{value, MsgStatus}, InnerQa} -> + {Count, Q, RamIndexCount, IndexState}; + {{value, IndexOnDisk, MsgStatus}, Qa} -> {RamIndexCount1, IndexState1} = case IndexOnDisk of true -> {RamIndexCount, IndexState}; @@ -1124,6 +1057,5 @@ push_betas_to_deltas(Generator, Consumer, Limit, IndexOnDisk, InnerQ, Q, {RamIndexCount - 1, IndexState2} end, push_betas_to_deltas( - Generator, Consumer, Limit, IndexOnDisk, InnerQa, Q, Count + 1, - RamIndexCount1, IndexState1) + Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. |
