summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-28 14:37:44 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-28 14:37:44 +0000
commit836e7f3709527e4fb4c8576d6c151e8b2614b15e (patch)
tree393458463f46f585ecdd803733a2d8b1d5bf50b4
parent3b58d435d3a5fe4775d810af4ed4c53a5034def7 (diff)
downloadrabbitmq-server-git-836e7f3709527e4fb4c8576d6c151e8b2614b15e.tar.gz
Provide early termination for backing queue fold
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_backing_queue_qc.erl5
-rw-r--r--src/rabbit_tests.erl43
-rw-r--r--src/rabbit_variable_queue.erl54
4 files changed, 70 insertions, 36 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index ffa716b638..071962a57b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -162,8 +162,8 @@
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
- rabbit_types:message_properties(), A) -> A),
- A, state()) -> {A, state()}.
+ rabbit_types:message_properties(), A) ->
+ {('stop' | 'cont'), A}), A, state()) -> {A, state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 764911b9e0..982b247904 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -332,7 +332,8 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
#state{messages = Messages} = S,
lists:foldl(fun ({_SeqId, {MsgProps, Msg}}, Acc) ->
- foldfun(Msg, MsgProps, Acc)
+ {cont, Acc1} = foldfun(Msg, MsgProps, Acc),
+ Acc1
end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
@@ -393,7 +394,7 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
-foldfun(Msg, _MsgProps, Acc) -> [Msg | Acc].
+foldfun(Msg, _MsgProps, Acc) -> {cont, [Msg | Acc]}.
foldacc() -> [].
dropfun(Props) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d5c096a197..6b45b02133 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1697,6 +1697,7 @@ test_backing_queue() ->
passed = test_queue_index(),
passed = test_queue_index_props(),
passed = test_variable_queue(),
+ passed = test_variable_queue_fold(),
passed = test_variable_queue_delete_msg_store_files_callback(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
@@ -2299,6 +2300,32 @@ wait_for_confirms(Unconfirmed) ->
end
end.
+test_variable_queue_fold() ->
+ Count = rabbit_queue_index:next_segment_boundary(0),
+ [passed = with_fresh_variable_queue(
+ fun (VQ) -> test_variable_queue_fold_shortcut(VQ, Cut) end) ||
+ Cut <- [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]],
+ passed.
+
+test_variable_queue_fold_shortcut(VQ0, Cut) ->
+ Count = rabbit_queue_index:next_segment_boundary(0),
+ Msg2Int = fun (#basic_message{
+ content = #content{ payload_fragments_rev = P}}) ->
+ binary_to_term(list_to_binary(lists:reverse(P)))
+ end,
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(
+ true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, _, A) ->
+ case Msg2Int(M) =< Cut of
+ true -> {cont, [M | A]};
+ false -> {stop, A}
+ end
+ end, [], VQ2),
+ true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] ==
+ [Msg2Int(M) || M <- Acc],
+ VQ3.
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
@@ -2310,23 +2337,9 @@ test_variable_queue() ->
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1,
- fun test_variable_queue_fold/1]],
+ fun test_variable_queue_requeue/1]],
passed.
-test_variable_queue_fold(VQ0) ->
- Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1,
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(
- true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
- {Acc, VQ3} = rabbit_variable_queue:fold(
- fun (M, _, A) -> [M | A] end, [], VQ2),
- true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
- [list_to_binary(lists:reverse(P)) ||
- #basic_message{ content = #content{ payload_fragments_rev = P}} <-
- Acc],
- VQ3.
-
test_variable_queue_requeue(VQ0) ->
Interval = 50,
Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b826413aee..f1b7203673 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -687,13 +687,15 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
QFun = fun(MsgStatus, {Acc0, State0}) ->
{#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
read_msg(MsgStatus, false, State0),
- {Fun(Msg, MsgProps, Acc0), State1}
+ {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {StopGo, {AccNext, State1}}
end,
- {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
- {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
- {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2),
- {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
- {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1),
+ {Cont1, {Acc1, State1}} = shortcut_qfold(QFun, {cont, {Acc, State}}, Q4),
+ {Cont2, {Acc2, State2}} = shortcut_qfold(QFun, {Cont1, {Acc1, State1}}, Q3),
+ {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
+ DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Cont4, {Acc4, State4}} = shortcut_qfold(QFun, {Cont3, {Acc3, State3}}, Q2),
+ {_, {Acc5, State5}} = shortcut_qfold(QFun, {Cont4, {Acc4, State4}}, Q1),
{Acc5, State5}.
len(#vqstate { len = Len }) -> Len.
@@ -1442,9 +1444,26 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {Acc, State};
-delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
+shortcut_qfold(_Fun, {stop, _Acc} = A, _Q) ->
+ A;
+shortcut_qfold(Fun, {cont, Acc} = A, Q) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> A;
+ {{value, V}, Q1} -> shortcut_qfold(Fun, Fun(V, Acc), Q1)
+ end.
+
+shortcut_lfold(_Fun, {stop, _Acc} = A, _List) ->
+ A;
+shortcut_lfold(_Fun, {cont, _Acc} = A, []) ->
+ A;
+shortcut_lfold(Fun, {cont, Acc}, [H | Rest]) ->
+ shortcut_lfold(Fun, Fun(H, Acc), Rest).
+
+delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
+ {stop, {Acc, State}};
+delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {cont, {Acc, State}};
+delta_fold(Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
#vqstate { index_state = IndexState,
msg_store_clients = MSCState } = State) ->
DeltaSeqId1 = lists:min(
@@ -1452,14 +1471,15 @@ delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Acc1, MSCState1} =
- lists:foldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent,
- _IsDelivered}, {Acc0, MSCState0}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent, MsgId),
- {Fun(Msg, MsgProps, Acc0), MSCState1}
- end, {Acc, MSCState}, List),
- delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
+ {StopCont, {Acc1, MSCState1}} =
+ shortcut_lfold(fun ({MsgId, _SeqId, MsgProps, IsPersistent,
+ _IsDelivered}, {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {StopCont, {AccNext, MSCState1}}
+ end, {cont, {Acc, MSCState}}, List),
+ delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
State #vqstate { index_state = IndexState1,
msg_store_clients = MSCState1 }).