diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-29 11:08:18 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-29 11:08:18 +0000 |
| commit | 02cf4f8c70db8516b6b809b89b776da3a4bd08d1 (patch) | |
| tree | 7f196a4665346c56a94bb05dd4cee943207c1076 | |
| parent | 423fbb589dc4fbf0cb9be363a15de8f2bfbf57b1 (diff) | |
| parent | 39c5bb32fb6f61566f977ca6fde9750216237a78 (diff) | |
| download | rabbitmq-server-git-02cf4f8c70db8516b6b809b89b776da3a4bd08d1.tar.gz | |
Merge in default, and use the fold-stop thing rather than throwing.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 50 |
6 files changed, 89 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 10efc798db..c932249e25 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1156,14 +1156,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call(sync_mirrors, From, State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> + S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case BQ:depth(BQS) - BQ:len(BQS) of 0 -> gen_server2:reply(From, ok), - try - BQS1 = rabbit_mirror_queue_master:sync_mirrors(BQS), - noreply(State#q{backing_queue_state = BQS1}) - catch - {time_to_shutdown, Reason} -> - {stop, Reason, State} + case rabbit_mirror_queue_master:sync_mirrors(BQS) of + {shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)}; + {ok, BQS1} -> noreply(S(BQS1)) + end end; _ -> reply({error, pending_acks}, State) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index e2945e1d9c..96c58cb9da 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -158,7 +158,8 @@ %% Fold over all the messages in a queue and return the accumulated %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), - rabbit_types:message_properties(), A) -> A), + rabbit_types:message_properties(), + A) -> {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index ed8f797abb..a5d0a00855 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -159,7 +159,7 @@ qc_purge(#state{bqstate = BQ}) -> {call, ?BQMOD, purge, [BQ]}. qc_fold(#state{bqstate = BQ}) -> - {call, ?BQMOD, fold, [fun foldfun/3, foldacc(), BQ]}. + {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}. %% Preconditions @@ -329,11 +329,14 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end, ReportedConfirmed); -postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) -> +postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) -> #state{messages = Messages} = S, - lists:foldl(fun ({_SeqId, {MsgProps, Msg}}, Acc) -> - foldfun(Msg, MsgProps, Acc) - end, foldacc(), gb_trees:to_list(Messages)) =:= Res; + {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) -> + {stop, Acc}; + ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) -> + FoldFun(Msg, MsgProps, Acc) + end, {cont, Acc0}, gb_trees:to_list(Messages)), + true = Model =:= Res; postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> ?BQMOD:len(BQ) =:= Len. @@ -393,7 +396,13 @@ rand_choice(List, Selection, N) -> rand_choice(List -- [Picked], [Picked | Selection], N - 1). -foldfun(Msg, _MsgProps, Acc) -> [Msg | Acc]. +makefoldfun(Size) -> + fun (Msg, _MsgProps, Acc) -> + case length(Acc) > Size of + false -> {cont, [Msg | Acc]}; + true -> {stop, Acc} + end + end. foldacc() -> []. dropfun(Props) -> diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index b9fb6cb665..b6d93c0d68 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -57,25 +57,28 @@ master_prepare(Ref, SPids) -> master_go(Syncer, Ref, Name, BQ, BQS) -> SendArgs = {Syncer, Ref, Name}, - {_, BQS1} = + {Acc, BQS1} = BQ:fold(fun (Msg, MsgProps, {I, Last}) -> - {I + 1, master_send(SendArgs, I, Last, Msg, MsgProps)} + master_send(SendArgs, I, Last, Msg, MsgProps) end, {0, erlang:now()}, BQS), Syncer ! {done, Ref}, - BQS1. + case Acc of + {shutdown, Reason} -> {shutdown, Reason, BQS1}; + _ -> {ok, BQS1} + end. master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) -> Syncer ! {msg, Ref, Msg, MsgProps}, + Acc = {I + 1, + case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of + true -> rabbit_log:info("Synchronising ~s: ~p messages~n", + [rabbit_misc:rs(Name), I]), + erlang:now(); + false -> Last + end}, receive - {msg_ok, Ref} -> ok; - {'EXIT', _Pid, Reason} -> throw({time_to_shutdown, Reason}) - end, - case timer:now_diff(erlang:now(), Last) > - ?SYNC_PROGRESS_INTERVAL of - true -> rabbit_log:info("Synchronising ~s: ~p messages~n", - [rabbit_misc:rs(Name), I]), - erlang:now(); - false -> Last + {msg_ok, Ref} -> {cont, Acc}; + {'EXIT', _Pid, Reason} -> {stop, {shutdown, Reason}} end. %% Master diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bb4ddceb9b..df8544a4ad 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2315,17 +2315,28 @@ test_variable_queue() -> passed. test_variable_queue_fold(VQ0) -> - Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1, + Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64, VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish( true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), - {Acc, VQ3} = rabbit_variable_queue:fold( - fun (M, _, A) -> [M | A] end, [], VQ2), - true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] == - [list_to_binary(lists:reverse(P)) || - #basic_message{ content = #content{ payload_fragments_rev = P}} <- - Acc], - VQ3. + lists:foldl( + fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end, + VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Count, VQ0) -> + {Acc, VQ1} = rabbit_variable_queue:fold( + fun (M, _, A) -> + case msg2int(M) =< Cut of + true -> {cont, [M | A]}; + false -> {stop, A} + end + end, [], VQ0), + true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] == + [msg2int(M) || M <- Acc], + VQ1. + +msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> + binary_to_term(list_to_binary(lists:reverse(P))). test_variable_queue_requeue(VQ0) -> Interval = 50, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7fbac782b7..30ab96f58c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -686,13 +686,15 @@ fold(Fun, Acc, #vqstate { q1 = Q1, QFun = fun(MsgStatus, {Acc0, State0}) -> {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } = read_msg(MsgStatus, false, State0), - {Fun(Msg, MsgProps, Acc0), State1} + {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0), + {StopGo, {AccNext, State1}} end, - {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), - {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), - {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2), - {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), - {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1), + {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), + {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), + {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, + DeltaSeqId, DeltaSeqIdEnd, State2), + {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), + {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), {Acc5, State5}. len(#vqstate { len = Len }) -> Len. @@ -1436,9 +1438,22 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {Acc, State}; -delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, +qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; +qfoldl( Fun, {cont, Acc} = A, Q) -> + case ?QUEUE:out(Q) of + {empty, _Q} -> A; + {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) + end. + +lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; +lfoldl(_Fun, {cont, _Acc} = A, []) -> A; +lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). + +delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> + {stop, {Acc, State}}; +delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> + {cont, {Acc, State}}; +delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, #vqstate { index_state = IndexState, msg_store_clients = MSCState } = State) -> DeltaSeqId1 = lists:min( @@ -1446,14 +1461,15 @@ delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Acc1, MSCState1} = - lists:foldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, - _IsDelivered}, {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {Fun(Msg, MsgProps, Acc0), MSCState1} - end, {Acc, MSCState}, List), - delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd, + {StopCont, {Acc1, MSCState1}} = + lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, + {Acc0, MSCState0}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, MsgId), + {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), + {StopCont, {AccNext, MSCState1}} + end, {cont, {Acc, MSCState}}, List), + delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }). |
