summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-29 11:08:18 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-29 11:08:18 +0000
commit02cf4f8c70db8516b6b809b89b776da3a4bd08d1 (patch)
tree7f196a4665346c56a94bb05dd4cee943207c1076
parent423fbb589dc4fbf0cb9be363a15de8f2bfbf57b1 (diff)
parent39c5bb32fb6f61566f977ca6fde9750216237a78 (diff)
downloadrabbitmq-server-git-02cf4f8c70db8516b6b809b89b776da3a4bd08d1.tar.gz
Merge in default, and use the fold-stop thing rather than throwing.
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_backing_queue.erl3
-rw-r--r--src/rabbit_backing_queue_qc.erl21
-rw-r--r--src/rabbit_mirror_queue_sync.erl27
-rw-r--r--src/rabbit_tests.erl27
-rw-r--r--src/rabbit_variable_queue.erl50
6 files changed, 89 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 10efc798db..c932249e25 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1156,14 +1156,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_call(sync_mirrors, From,
State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
+ S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case BQ:depth(BQS) - BQ:len(BQS) of
0 -> gen_server2:reply(From, ok),
- try
- BQS1 = rabbit_mirror_queue_master:sync_mirrors(BQS),
- noreply(State#q{backing_queue_state = BQS1})
- catch
- {time_to_shutdown, Reason} ->
- {stop, Reason, State}
+ case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ {shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)};
+ {ok, BQS1} -> noreply(S(BQS1))
+ end
end;
_ -> reply({error, pending_acks}, State)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index e2945e1d9c..96c58cb9da 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -158,7 +158,8 @@
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
- rabbit_types:message_properties(), A) -> A),
+ rabbit_types:message_properties(),
+ A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.
%% How long is my queue?
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index ed8f797abb..a5d0a00855 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -159,7 +159,7 @@ qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
qc_fold(#state{bqstate = BQ}) ->
- {call, ?BQMOD, fold, [fun foldfun/3, foldacc(), BQ]}.
+ {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}.
%% Preconditions
@@ -329,11 +329,14 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
-postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
+postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
#state{messages = Messages} = S,
- lists:foldl(fun ({_SeqId, {MsgProps, Msg}}, Acc) ->
- foldfun(Msg, MsgProps, Acc)
- end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
+ {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
+ {stop, Acc};
+ ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
+ FoldFun(Msg, MsgProps, Acc)
+ end, {cont, Acc0}, gb_trees:to_list(Messages)),
+ true = Model =:= Res;
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -393,7 +396,13 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
-foldfun(Msg, _MsgProps, Acc) -> [Msg | Acc].
+makefoldfun(Size) ->
+ fun (Msg, _MsgProps, Acc) ->
+ case length(Acc) > Size of
+ false -> {cont, [Msg | Acc]};
+ true -> {stop, Acc}
+ end
+ end.
foldacc() -> [].
dropfun(Props) ->
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index b9fb6cb665..b6d93c0d68 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -57,25 +57,28 @@ master_prepare(Ref, SPids) ->
master_go(Syncer, Ref, Name, BQ, BQS) ->
SendArgs = {Syncer, Ref, Name},
- {_, BQS1} =
+ {Acc, BQS1} =
BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
- {I + 1, master_send(SendArgs, I, Last, Msg, MsgProps)}
+ master_send(SendArgs, I, Last, Msg, MsgProps)
end, {0, erlang:now()}, BQS),
Syncer ! {done, Ref},
- BQS1.
+ case Acc of
+ {shutdown, Reason} -> {shutdown, Reason, BQS1};
+ _ -> {ok, BQS1}
+ end.
master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) ->
Syncer ! {msg, Ref, Msg, MsgProps},
+ Acc = {I + 1,
+ case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ true -> rabbit_log:info("Synchronising ~s: ~p messages~n",
+ [rabbit_misc:rs(Name), I]),
+ erlang:now();
+ false -> Last
+ end},
receive
- {msg_ok, Ref} -> ok;
- {'EXIT', _Pid, Reason} -> throw({time_to_shutdown, Reason})
- end,
- case timer:now_diff(erlang:now(), Last) >
- ?SYNC_PROGRESS_INTERVAL of
- true -> rabbit_log:info("Synchronising ~s: ~p messages~n",
- [rabbit_misc:rs(Name), I]),
- erlang:now();
- false -> Last
+ {msg_ok, Ref} -> {cont, Acc};
+ {'EXIT', _Pid, Reason} -> {stop, {shutdown, Reason}}
end.
%% Master
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bb4ddceb9b..df8544a4ad 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2315,17 +2315,28 @@ test_variable_queue() ->
passed.
test_variable_queue_fold(VQ0) ->
- Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1,
+ Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
- {Acc, VQ3} = rabbit_variable_queue:fold(
- fun (M, _, A) -> [M | A] end, [], VQ2),
- true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
- [list_to_binary(lists:reverse(P)) ||
- #basic_message{ content = #content{ payload_fragments_rev = P}} <-
- Acc],
- VQ3.
+ lists:foldl(
+ fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end,
+ VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Count, VQ0) ->
+ {Acc, VQ1} = rabbit_variable_queue:fold(
+ fun (M, _, A) ->
+ case msg2int(M) =< Cut of
+ true -> {cont, [M | A]};
+ false -> {stop, A}
+ end
+ end, [], VQ0),
+ true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] ==
+ [msg2int(M) || M <- Acc],
+ VQ1.
+
+msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
+ binary_to_term(list_to_binary(lists:reverse(P))).
test_variable_queue_requeue(VQ0) ->
Interval = 50,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7fbac782b7..30ab96f58c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -686,13 +686,15 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
QFun = fun(MsgStatus, {Acc0, State0}) ->
{#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
read_msg(MsgStatus, false, State0),
- {Fun(Msg, MsgProps, Acc0), State1}
+ {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {StopGo, {AccNext, State1}}
end,
- {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
- {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
- {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2),
- {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
- {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1),
+ {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
+ {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
+ {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
+ DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
+ {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
{Acc5, State5}.
len(#vqstate { len = Len }) -> Len.
@@ -1436,9 +1438,22 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {Acc, State};
-delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
+qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
+qfoldl( Fun, {cont, Acc} = A, Q) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> A;
+ {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ end.
+
+lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
+lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
+lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
+
+delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
+ {stop, {Acc, State}};
+delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {cont, {Acc, State}};
+delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
#vqstate { index_state = IndexState,
msg_store_clients = MSCState } = State) ->
DeltaSeqId1 = lists:min(
@@ -1446,14 +1461,15 @@ delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Acc1, MSCState1} =
- lists:foldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent,
- _IsDelivered}, {Acc0, MSCState0}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent, MsgId),
- {Fun(Msg, MsgProps, Acc0), MSCState1}
- end, {Acc, MSCState}, List),
- delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
+ {StopCont, {Acc1, MSCState1}} =
+ lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
+ {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {StopCont, {AccNext, MSCState1}}
+ end, {cont, {Acc, MSCState}}, List),
+ delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
State #vqstate { index_state = IndexState1,
msg_store_clients = MSCState1 }).