diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 54 |
4 files changed, 70 insertions, 36 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ffa716b638..071962a57b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -162,8 +162,8 @@ %% Fold over all the messages in a queue and return the accumulated %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), - rabbit_types:message_properties(), A) -> A), - A, state()) -> {A, state()}. + rabbit_types:message_properties(), A) -> + {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? -callback len(state()) -> non_neg_integer(). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 764911b9e0..982b247904 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -332,7 +332,8 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) -> #state{messages = Messages} = S, lists:foldl(fun ({_SeqId, {MsgProps, Msg}}, Acc) -> - foldfun(Msg, MsgProps, Acc) + {cont, Acc1} = foldfun(Msg, MsgProps, Acc), + Acc1 end, foldacc(), gb_trees:to_list(Messages)) =:= Res; postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> @@ -393,7 +394,7 @@ rand_choice(List, Selection, N) -> rand_choice(List -- [Picked], [Picked | Selection], N - 1). -foldfun(Msg, _MsgProps, Acc) -> [Msg | Acc]. +foldfun(Msg, _MsgProps, Acc) -> {cont, [Msg | Acc]}. foldacc() -> []. dropfun(Props) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d5c096a197..6b45b02133 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1697,6 +1697,7 @@ test_backing_queue() -> passed = test_queue_index(), passed = test_queue_index_props(), passed = test_variable_queue(), + passed = test_variable_queue_fold(), passed = test_variable_queue_delete_msg_store_files_callback(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -2299,6 +2300,32 @@ wait_for_confirms(Unconfirmed) -> end end. +test_variable_queue_fold() -> + Count = rabbit_queue_index:next_segment_boundary(0), + [passed = with_fresh_variable_queue( + fun (VQ) -> test_variable_queue_fold_shortcut(VQ, Cut) end) || + Cut <- [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]], + passed. + +test_variable_queue_fold_shortcut(VQ0, Cut) -> + Count = rabbit_queue_index:next_segment_boundary(0), + Msg2Int = fun (#basic_message{ + content = #content{ payload_fragments_rev = P}}) -> + binary_to_term(list_to_binary(lists:reverse(P))) + end, + VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish( + true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, _, A) -> + case Msg2Int(M) =< Cut of + true -> {cont, [M | A]}; + false -> {stop, A} + end + end, [], VQ2), + true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] == + [Msg2Int(M) || M <- Acc], + VQ3. + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, @@ -2310,23 +2337,9 @@ test_variable_queue() -> fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, - fun test_variable_queue_requeue/1, - fun test_variable_queue_fold/1]], + fun test_variable_queue_requeue/1]], passed. -test_variable_queue_fold(VQ0) -> - Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1, - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish( - true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), - {Acc, VQ3} = rabbit_variable_queue:fold( - fun (M, _, A) -> [M | A] end, [], VQ2), - true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] == - [list_to_binary(lists:reverse(P)) || - #basic_message{ content = #content{ payload_fragments_rev = P}} <- - Acc], - VQ3. - test_variable_queue_requeue(VQ0) -> Interval = 50, Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b826413aee..f1b7203673 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -687,13 +687,15 @@ fold(Fun, Acc, #vqstate { q1 = Q1, QFun = fun(MsgStatus, {Acc0, State0}) -> {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } = read_msg(MsgStatus, false, State0), - {Fun(Msg, MsgProps, Acc0), State1} + {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0), + {StopGo, {AccNext, State1}} end, - {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), - {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), - {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2), - {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), - {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1), + {Cont1, {Acc1, State1}} = shortcut_qfold(QFun, {cont, {Acc, State}}, Q4), + {Cont2, {Acc2, State2}} = shortcut_qfold(QFun, {Cont1, {Acc1, State1}}, Q3), + {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, + DeltaSeqId, DeltaSeqIdEnd, State2), + {Cont4, {Acc4, State4}} = shortcut_qfold(QFun, {Cont3, {Acc3, State3}}, Q2), + {_, {Acc5, State5}} = shortcut_qfold(QFun, {Cont4, {Acc4, State4}}, Q1), {Acc5, State5}. len(#vqstate { len = Len }) -> Len. @@ -1442,9 +1444,26 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {Acc, State}; -delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, +shortcut_qfold(_Fun, {stop, _Acc} = A, _Q) -> + A; +shortcut_qfold(Fun, {cont, Acc} = A, Q) -> + case ?QUEUE:out(Q) of + {empty, _Q} -> A; + {{value, V}, Q1} -> shortcut_qfold(Fun, Fun(V, Acc), Q1) + end. + +shortcut_lfold(_Fun, {stop, _Acc} = A, _List) -> + A; +shortcut_lfold(_Fun, {cont, _Acc} = A, []) -> + A; +shortcut_lfold(Fun, {cont, Acc}, [H | Rest]) -> + shortcut_lfold(Fun, Fun(H, Acc), Rest). + +delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> + {stop, {Acc, State}}; +delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> + {cont, {Acc, State}}; +delta_fold(Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, #vqstate { index_state = IndexState, msg_store_clients = MSCState } = State) -> DeltaSeqId1 = lists:min( @@ -1452,14 +1471,15 @@ delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Acc1, MSCState1} = - lists:foldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, - _IsDelivered}, {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {Fun(Msg, MsgProps, Acc0), MSCState1} - end, {Acc, MSCState}, List), - delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd, + {StopCont, {Acc1, MSCState1}} = + shortcut_lfold(fun ({MsgId, _SeqId, MsgProps, IsPersistent, + _IsDelivered}, {Acc0, MSCState0}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, MsgId), + {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), + {StopCont, {AccNext, MSCState1}} + end, {cont, {Acc, MSCState}}, List), + delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }). |
