diff options
| -rw-r--r-- | include/rabbit_backing_queue.hrl | 27 | ||||
| -rw-r--r-- | src/q3tree.erl | 107 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
3 files changed, 163 insertions, 39 deletions
diff --git a/include/rabbit_backing_queue.hrl b/include/rabbit_backing_queue.hrl new file mode 100644 index 0000000000..5947356d16 --- /dev/null +++ b/include/rabbit_backing_queue.hrl @@ -0,0 +1,27 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-record(msg_status, + { seq_id, + msg_id, + msg, + is_persistent, + is_delivered, + msg_on_disk, + index_on_disk, + msg_props + }). + diff --git a/src/q3tree.erl b/src/q3tree.erl new file mode 100644 index 0000000000..a5fcd74014 --- /dev/null +++ b/src/q3tree.erl @@ -0,0 +1,107 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(q3tree). + +%% A less general random access variation of bpqueue for message status records + +-export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, least_key/1, + join/2, join_bpqueue/2, foldr/3, from_batch/1, map_fold_filter_r/4]). + +-include("rabbit.hrl"). +-include("rabbit_backing_queue.hrl"). + +new() -> + gb_trees:empty(). + +is_empty(T) -> + gb_trees:is_empty(T). + +len(T) -> + gb_trees:size(T). + +in(IndexOnDisk, MsgStatus, Tree) -> in_r(IndexOnDisk, MsgStatus, Tree). + +in_r(IndexOnDisk, + #msg_status { seq_id = SeqId, index_on_disk = IndexOnDisk } = MsgStatus, + Tree) -> + gb_trees:insert(SeqId, MsgStatus, Tree); +in_r(IndexOnDisk, _Msgstatus, _Tree) -> + throw({prefix_and_msg_disagree, IndexOnDisk}). + +out(Tree) -> out1(Tree, fun gb_trees:take_smallest/1). +out_r(Tree) -> out1(Tree, fun gb_trees:take_largest/1). + +out1(Tree, TakeFun) -> + case gb_trees:is_empty(Tree) of + true -> {empty, Tree}; + false -> {_Key, #msg_status { index_on_disk = IndexOnDisk } = MsgStatus, + Tree2} = TakeFun(Tree), + {{value, IndexOnDisk, MsgStatus}, Tree2} + end. + +least_key(Tree) -> + {Least, _} = gb_trees:smallest(Tree), + Least. + +join(T1, T2) -> + join1(gb_trees:iterator(T1), T2). +join1(Iter, T) -> + case gb_trees:next(Iter) of + none -> T; + {_SeqId, + #msg_status { index_on_disk = IndexOnDisk } = MsgStatus, + Iter1} -> join1(Iter1, in_r(IndexOnDisk, MsgStatus, T)) + end. + +join_bpqueue(T, Q) -> + bpqueue:foldr(fun (IndexOnDisk, MsgStatus, Tree) -> + in_r(IndexOnDisk, MsgStatus, Tree) + end, T, Q). + +foldr(Fun, Acc, Tree) -> + lists:foldr(Fun, Acc, gb_trees:to_list(Tree)). + +from_batch({IndexOnDisk, L}) -> + lists:foldl(fun (MsgStatus, Tree) -> + in_r(IndexOnDisk, MsgStatus, Tree) + end, new(), L). + +map_fold_filter_r(PFilter, Fun, Acc, Tree) -> + map_fold_filter_r1(PFilter, Fun, Acc, Tree, new()). + +map_fold_filter_r1(PFilter, Fun, Acc, TreeOld, TreeNew) -> + case out_r(TreeOld) of + {empty, _T} -> {TreeNew, Acc}; + {{value, + IndexOnDisk, #msg_status{index_on_disk = IndexOnDisk} = MsgStatus}, + TreeOld1} -> + case PFilter(IndexOnDisk) of + false -> + map_fold_filter_r1(PFilter, Fun, Acc, TreeOld1, + in_r(IndexOnDisk, MsgStatus, TreeNew)); + true -> + case Fun(MsgStatus, Acc) of + stop -> + {join(TreeOld, TreeNew), Acc}; + {IndexOnDisk1, MsgStatus1, Acc1} -> + map_fold_filter_r1(PFilter, Fun, Acc1, TreeOld1, + in_r(IndexOnDisk1, MsgStatus1, + TreeNew)) + end + end + end. + diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0944788869..ecfe3cdac1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -264,17 +264,6 @@ -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). --record(msg_status, - { seq_id, - msg_id, - msg, - is_persistent, - is_delivered, - msg_on_disk, - index_on_disk, - msg_props - }). - -record(delta, { start_seq_id, %% start_seq_id is inclusive count, @@ -292,6 +281,7 @@ -define(TRANSIENT_MSG_STORE, msg_store_transient). -include("rabbit.hrl"). +-include("rabbit_backing_queue.hrl"). %%---------------------------------------------------------------------------- @@ -317,7 +307,7 @@ q1 :: queue(), q2 :: bpqueue:bpqueue(), delta :: delta(), - q3 :: bpqueue:bpqueue(), + q3 :: gb_tree(), q4 :: gb_tree(), next_seq_id :: seq_id(), pending_ack :: dict(), @@ -711,7 +701,7 @@ status(#vqstate { [ {q1 , queue:len(Q1)}, {q2 , bpqueue:len(Q2)}, {delta , Delta}, - {q3 , bpqueue:len(Q3)}, + {q3 , q3tree:len(Q3)}, {q4 , gb_trees:size(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, @@ -744,7 +734,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, E1 = queue:is_empty(Q1), E2 = bpqueue:is_empty(Q2), ED = Delta#delta.count == 0, - E3 = bpqueue:is_empty(Q3), + E3 = q3tree:is_empty(Q3), E4 = gb_trees:is_empty(Q4), LZ = Len == 0, @@ -882,7 +872,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> Acks1} end end, {[], [], []}, List), - {bpqueue:from_list([{true, Filtered}]), + {q3tree:from_batch({true, Filtered}), rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. @@ -913,7 +903,7 @@ combine_deltas(#delta { start_seq_id = StartLow, #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. beta_fold(Fun, Init, Q) -> - bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). + q3tree:foldr(fun ({_SeqID, Value}, Acc) -> Fun(Value, Acc) end, Init, Q). update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -939,7 +929,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q1 = queue:new(), q2 = bpqueue:new(), delta = Delta, - q3 = bpqueue:new(), + q3 = q3tree:new(), q4 = gb_trees:empty(), next_seq_id = NextSeqId, pending_ack = dict:new(), @@ -982,7 +972,7 @@ in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> case gb_trees:is_empty(Q4) of true -> State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; false -> {MsgStatus1 = #msg_status{ seq_id = SeqId}, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), @@ -1072,7 +1062,7 @@ purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> - case bpqueue:is_empty(Q3) of + case q3tree:is_empty(Q3) of true -> {LensByStore, State}; false -> {LensByStore1, IndexState1} = remove_queue_entries(fun beta_fold/3, Q3, @@ -1080,7 +1070,7 @@ purge_betas_and_deltas(LensByStore, purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { - q3 = bpqueue:new(), + q3 = q3tree:new(), index_state = IndexState1 })) end. @@ -1131,7 +1121,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case bpqueue:is_empty(Q3) of + State2 = case q3tree:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; true -> State1 #vqstate { q4 = gb_trees:insert(SeqId, m(MsgStatus1), Q4) } @@ -1451,7 +1441,7 @@ limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, %% can never end up in delta due them residing in the only segment %% held by q3. {Q3a, {Quota2, IndexState2}} = limit_ram_index( - fun bpqueue:map_fold_filter_r/4, + fun q3tree:map_fold_filter_r/4, Q3, {Quota1, IndexState1}), State #vqstate { q2 = Q2a, q3 = Q3a, index_state = IndexState2, @@ -1478,7 +1468,7 @@ permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3, delta = #delta { count = DeltaCount } }) -> - BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), + BetaLen = bpqueue:len(Q2) + q3tree:len(Q3), BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). chunk_size(Current, Permitted) @@ -1494,7 +1484,7 @@ fetch_from_q3(State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount}) -> - case bpqueue:out(Q3) of + case q3tree:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, IndexOnDisk, MsgStatus}, Q3a} -> @@ -1503,7 +1493,7 @@ fetch_from_q3(State = #vqstate { State1 = State #vqstate { q3 = Q3a, ram_index_count = RamIndexCount1 }, State2 = - case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of + case {q3tree:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and we @@ -1544,7 +1534,7 @@ maybe_deltas_to_betas(State = #vqstate { {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, - case bpqueue:len(Q3a) of + case q3tree:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold @@ -1552,14 +1542,14 @@ maybe_deltas_to_betas(State = #vqstate { State1 #vqstate { delta = Delta #delta { start_seq_id = DeltaSeqId1 }}); Q3aLen -> - Q3b = bpqueue:join(Q3, Q3a), + Q3b = q3tree:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 State1 #vqstate { q2 = bpqueue:new(), delta = ?BLANK_DELTA, - q3 = bpqueue:join(Q3b, Q2) }; + q3 = q3tree:join_bpqueue(Q3b, Q2) }; N when N > 0 -> Delta1 = #delta { start_seq_id = DeltaSeqId1, count = N, @@ -1580,7 +1570,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State1 #vqstate { q1 = Q1a, - q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) }; + q3 = q3tree:in(IndexOnDisk, MsgStatus, Q3) }; (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, @@ -1592,7 +1582,7 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> fun q4_out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> - State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + State1 #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), q4 = Q4a } end, Quota, Q4, State). @@ -1629,11 +1619,11 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, ram_index_count = RamIndexCount }) -> {Delta2, Q2a, RamIndexCount2, IndexState2} = push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun bpqueue:out/1, Q2, + fun bpqueue:out/1, bpqueue, Q2, RamIndexCount, IndexState), {Delta3, Q3a, RamIndexCount3, IndexState3} = push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun bpqueue:out_r/1, Q3, + fun q3tree:out_r/1, q3tree, Q3, RamIndexCount2, IndexState2), Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), State #vqstate { q2 = Q2a, @@ -1642,19 +1632,19 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, index_state = IndexState3, ram_index_count = RamIndexCount3 }. -push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> - case bpqueue:out(Q) of +push_betas_to_deltas(LimitFun, Generator, QMod, Q, RamIndexCount, IndexState) -> + case QMod:out(Q) of {empty, _Q} -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} -> {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} = - bpqueue:out_r(Q), + QMod:out_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; false -> {Len, Qc, RamIndexCount1, IndexState1} = - push_betas_to_deltas(Generator, Limit, Q, 0, - RamIndexCount, IndexState), + push_betas_to_deltas1(Generator, Limit, Q, 0, + RamIndexCount, IndexState), {#delta { start_seq_id = Limit, count = Len, end_seq_id = MaxSeqId + 1 }, @@ -1662,7 +1652,7 @@ push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> end end. -push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> +push_betas_to_deltas1(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of {empty, _Q} -> {Count, Q, RamIndexCount, IndexState}; @@ -1679,7 +1669,7 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> IndexState), {RamIndexCount - 1, IndexState2} end, - push_betas_to_deltas( + push_betas_to_deltas1( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. |
