diff options
| -rw-r--r-- | src/q3tree.erl | 107 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 148 |
2 files changed, 85 insertions, 170 deletions
diff --git a/src/q3tree.erl b/src/q3tree.erl deleted file mode 100644 index a5fcd74014..0000000000 --- a/src/q3tree.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -%% - --module(q3tree). - -%% A less general random access variation of bpqueue for message status records - --export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, least_key/1, - join/2, join_bpqueue/2, foldr/3, from_batch/1, map_fold_filter_r/4]). - --include("rabbit.hrl"). --include("rabbit_backing_queue.hrl"). - -new() -> - gb_trees:empty(). - -is_empty(T) -> - gb_trees:is_empty(T). - -len(T) -> - gb_trees:size(T). - -in(IndexOnDisk, MsgStatus, Tree) -> in_r(IndexOnDisk, MsgStatus, Tree). - -in_r(IndexOnDisk, - #msg_status { seq_id = SeqId, index_on_disk = IndexOnDisk } = MsgStatus, - Tree) -> - gb_trees:insert(SeqId, MsgStatus, Tree); -in_r(IndexOnDisk, _Msgstatus, _Tree) -> - throw({prefix_and_msg_disagree, IndexOnDisk}). - -out(Tree) -> out1(Tree, fun gb_trees:take_smallest/1). -out_r(Tree) -> out1(Tree, fun gb_trees:take_largest/1). - -out1(Tree, TakeFun) -> - case gb_trees:is_empty(Tree) of - true -> {empty, Tree}; - false -> {_Key, #msg_status { index_on_disk = IndexOnDisk } = MsgStatus, - Tree2} = TakeFun(Tree), - {{value, IndexOnDisk, MsgStatus}, Tree2} - end. - -least_key(Tree) -> - {Least, _} = gb_trees:smallest(Tree), - Least. - -join(T1, T2) -> - join1(gb_trees:iterator(T1), T2). -join1(Iter, T) -> - case gb_trees:next(Iter) of - none -> T; - {_SeqId, - #msg_status { index_on_disk = IndexOnDisk } = MsgStatus, - Iter1} -> join1(Iter1, in_r(IndexOnDisk, MsgStatus, T)) - end. - -join_bpqueue(T, Q) -> - bpqueue:foldr(fun (IndexOnDisk, MsgStatus, Tree) -> - in_r(IndexOnDisk, MsgStatus, Tree) - end, T, Q). - -foldr(Fun, Acc, Tree) -> - lists:foldr(Fun, Acc, gb_trees:to_list(Tree)). - -from_batch({IndexOnDisk, L}) -> - lists:foldl(fun (MsgStatus, Tree) -> - in_r(IndexOnDisk, MsgStatus, Tree) - end, new(), L). - -map_fold_filter_r(PFilter, Fun, Acc, Tree) -> - map_fold_filter_r1(PFilter, Fun, Acc, Tree, new()). - -map_fold_filter_r1(PFilter, Fun, Acc, TreeOld, TreeNew) -> - case out_r(TreeOld) of - {empty, _T} -> {TreeNew, Acc}; - {{value, - IndexOnDisk, #msg_status{index_on_disk = IndexOnDisk} = MsgStatus}, - TreeOld1} -> - case PFilter(IndexOnDisk) of - false -> - map_fold_filter_r1(PFilter, Fun, Acc, TreeOld1, - in_r(IndexOnDisk, MsgStatus, TreeNew)); - true -> - case Fun(MsgStatus, Acc) of - stop -> - {join(TreeOld, TreeNew), Acc}; - {IndexOnDisk1, MsgStatus1, Acc1} -> - map_fold_filter_r1(PFilter, Fun, Acc1, TreeOld1, - in_r(IndexOnDisk1, MsgStatus1, - TreeNew)) - end - end - end. - diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 56280e1523..ff0509a7b4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -307,7 +307,7 @@ q1 :: queue(), q2 :: bpqueue:bpqueue(), delta :: delta(), - q3 :: gb_tree(), + q3 :: bpqueue:bpqueue(), q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict(), @@ -684,7 +684,7 @@ status(#vqstate { [ {q1 , queue:len(Q1)}, {q2 , bpqueue:len(Q2)}, {delta , Delta}, - {q3 , q3tree:len(Q3)}, + {q3 , bpqueue:len(Q3)}, {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, @@ -717,7 +717,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, E1 = queue:is_empty(Q1), E2 = bpqueue:is_empty(Q2), ED = Delta#delta.count == 0, - E3 = q3tree:is_empty(Q3), + E3 = bpqueue:is_empty(Q3), E4 = queue:is_empty(Q4), LZ = Len == 0, @@ -813,29 +813,33 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, IndexState) -> +betas_from_index_entries(List, TransientThreshold, AckedFun, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, - {Filtered1, Delivers1, Acks1}) -> + {Filtered1, Delivers1, Acks1} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> {[m(#msg_status { msg = undefined, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }) | Filtered1], - Delivers1, - Acks1} + false -> case AckedFun(SeqId) of + false -> {[m(#msg_status { + msg = undefined, + msg_id = MsgId, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps + }) | Filtered1], + Delivers1, + Acks1}; + true -> Acc + end end end, {[], [], []}, List), - {q3tree:from_batch({true, Filtered}), + {bpqueue:from_list([{true, Filtered}]), rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. @@ -866,7 +870,7 @@ combine_deltas(#delta { start_seq_id = StartLow, #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. beta_fold(Fun, Init, Q) -> - q3tree:foldr(fun ({_SeqID, Value}, Acc) -> Fun(Value, Acc) end, Init, Q). + bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -892,7 +896,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q1 = queue:new(), q2 = bpqueue:new(), delta = Delta, - q3 = q3tree:new(), + q3 = bpqueue:new(), q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), @@ -935,7 +939,7 @@ in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> case queue:is_empty(Q4) of true -> State #vqstate { - q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), + q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), @@ -993,11 +997,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of - {false, true, false, _, _} -> Rem(), IndexState1; - {false, true, true, _, _} -> Rem(), Ack(); - { true, true, true, false, false} -> Ack(); - _ -> IndexState1 + case {AckRequired, MsgOnDisk, IndexOnDisk} of + {false, true, false} -> Rem(), IndexState1; + {false, true, true} -> Rem(), Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1024,7 +1027,7 @@ purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> - case q3tree:is_empty(Q3) of + case bpqueue:is_empty(Q3) of true -> {LensByStore, State}; false -> {LensByStore1, IndexState1} = remove_queue_entries(fun beta_fold/3, Q3, @@ -1032,7 +1035,7 @@ purge_betas_and_deltas(LensByStore, purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { - q3 = q3tree:new(), + q3 = bpqueue:new(), index_state = IndexState1 })) end. @@ -1083,7 +1086,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case q3tree:is_empty(Q3) of + State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, @@ -1166,7 +1169,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = + {MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1178,7 +1181,7 @@ remove_pending_ack(KeepPersistent, State1 end; false -> IndexState1 = - rabbit_queue_index:ack(PersistentSeqIds, IndexState), + rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } @@ -1187,7 +1190,7 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, + {{MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1202,7 +1205,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), + IndexState1 = rabbit_queue_index:ack(AckTags, IndexState), [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( @@ -1212,18 +1215,17 @@ ack(MsgStoreFun, Fun, AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false, msg_id = MsgId }, - {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, - {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + {MsgIdsByStore, AllMsgIds}) -> + {MsgIdsByStore, [MsgId | AllMsgIds]}; +accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, + {MsgIdsByStore, AllMsgIds}) -> + {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> @@ -1351,9 +1353,9 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId, #vqstate { q3 = Q3 } = State1} = maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), State1 #vqstate { - q3 = q3tree:in_r(IndexOnDisk1, - MsgStatus1, - Q3), + q3 = q3_merge(IndexOnDisk1, + MsgStatus1, + Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }; @@ -1374,8 +1376,7 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId, pick_store(SeqId, #vqstate { q3 = Q3, delta = #delta { start_seq_id = DeltaLimit } = Delta}) -> - case q3tree:is_empty(Q3) orelse - not q3tree:is_empty(Q3) andalso SeqId < q3tree:least_key(Q3) of + case bpqueue:is_empty(Q3) orelse SeqId < q3_least_key(Q3) of true -> q4; false -> BlankDelta = case Delta of ?BLANK_DELTA_PATTERN(X) -> true; @@ -1387,6 +1388,24 @@ pick_store(SeqId, #vqstate { q3 = Q3, end end. +q3_least_key(BPQ) -> + {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = bpqueue:out(BPQ), + SeqId. + +q3_merge(IndexOnDisk, MsgStatus, Q) -> + q3_merge(IndexOnDisk, MsgStatus, Q, bpqueue:new()). + +q3_merge(IndexOnDisk, #msg_status {seq_id = SeqId } = MsgStatus, Q, Front) -> + case bpqueue:out(Q) of + {{value, IndexOnDiskHead, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} -> + case SeqId1 > SeqId of + true -> bpqueue:join(bpqueue:in(IndexOnDisk, MsgStatus, Front), Q); + false -> q3_merge(IndexOnDisk, MsgStatus, Q1, bpqueue:in(IndexOnDiskHead, MsgStatusHead, Front)) + end; + {empty, _Q1} -> + bpqueue:in(IndexOnDisk, MsgStatus, Front) + end. + q4_merge(MsgStatus, Q) -> q4_merge(MsgStatus, Q, queue:new()). @@ -1508,7 +1527,7 @@ limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, %% can never end up in delta due them residing in the only segment %% held by q3. {Q3a, {Quota2, IndexState2}} = limit_ram_index( - fun q3tree:map_fold_filter_r/4, + fun bpqueue:map_fold_filter_r/4, Q3, {Quota1, IndexState1}), State #vqstate { q2 = Q2a, q3 = Q3a, index_state = IndexState2, @@ -1535,7 +1554,7 @@ permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3, delta = #delta { count = DeltaCount } }) -> - BetaLen = bpqueue:len(Q2) + q3tree:len(Q3), + BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). chunk_size(Current, Permitted) @@ -1551,7 +1570,7 @@ fetch_from_q3(State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount}) -> - case q3tree:out(Q3) of + case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, IndexOnDisk, MsgStatus}, Q3a} -> @@ -1560,7 +1579,7 @@ fetch_from_q3(State = #vqstate { State1 = State #vqstate { q3 = Q3a, ram_index_count = RamIndexCount1 }, State2 = - case {q3tree:is_empty(Q3a), 0 == DeltaCount} of + case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and we @@ -1589,6 +1608,7 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, + pending_ack = PA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1599,9 +1619,11 @@ maybe_deltas_to_betas(State = #vqstate { {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = - betas_from_index_entries(List, TransientThreshold, IndexState1), + betas_from_index_entries(List, TransientThreshold, + fun (SeqId) -> dict:is_key(SeqId, PA) end, + IndexState1), State1 = State #vqstate { index_state = IndexState2 }, - case q3tree:len(Q3a) of + case bpqueue:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold @@ -1609,14 +1631,14 @@ maybe_deltas_to_betas(State = #vqstate { State1 #vqstate { delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); Q3aLen -> - Q3b = q3tree:join(Q3, Q3a), + Q3b = bpqueue:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 State1 #vqstate { q2 = bpqueue:new(), delta = ?BLANK_DELTA, - q3 = q3tree:join_bpqueue(Q3b, Q2) }; + q3 = bpqueue:join(Q3b, Q2) }; N when N > 0 -> Delta1 = #delta { start_seq_id = DeltaSeqId1, count = N, @@ -1637,7 +1659,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State1 #vqstate { q1 = Q1a, - q3 = q3tree:in(IndexOnDisk, MsgStatus, Q3) }; + q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) }; (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, @@ -1649,7 +1671,7 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> fun queue:out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> - State1 #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), + State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), q4 = Q4a } end, Quota, Q4, State). @@ -1686,11 +1708,11 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, ram_index_count = RamIndexCount }) -> {Delta2, Q2a, RamIndexCount2, IndexState2} = push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun bpqueue:out/1, bpqueue, Q2, + fun bpqueue:out/1, Q2, RamIndexCount, IndexState), {Delta3, Q3a, RamIndexCount3, IndexState3} = push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun q3tree:out_r/1, q3tree, Q3, + fun bpqueue:out_r/1, Q3, RamIndexCount2, IndexState2), Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), State #vqstate { q2 = Q2a, @@ -1699,19 +1721,19 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, index_state = IndexState3, ram_index_count = RamIndexCount3 }. -push_betas_to_deltas(LimitFun, Generator, QMod, Q, RamIndexCount, IndexState) -> - case QMod:out(Q) of +push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> + case bpqueue:out(Q) of {empty, _Q} -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} -> {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} = - QMod:out_r(Q), + bpqueue:out_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; false -> {Len, Qc, RamIndexCount1, IndexState1} = - push_betas_to_deltas1(Generator, Limit, Q, 0, - RamIndexCount, IndexState), + push_betas_to_deltas(Generator, Limit, Q, 0, + RamIndexCount, IndexState), {#delta { start_seq_id = Limit, count = Len, end_seq_id = MaxSeqId + 1 }, @@ -1719,7 +1741,7 @@ push_betas_to_deltas(LimitFun, Generator, QMod, Q, RamIndexCount, IndexState) -> end end. -push_betas_to_deltas1(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> +push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of {empty, _Q} -> {Count, Q, RamIndexCount, IndexState}; @@ -1736,7 +1758,7 @@ push_betas_to_deltas1(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> IndexState), {RamIndexCount - 1, IndexState2} end, - push_betas_to_deltas1( + push_betas_to_deltas( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. |
