diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-23 15:29:40 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-23 15:29:40 +0000 |
| commit | 417e51a2880c0b05a54a7f3c20dc93baff93d839 (patch) | |
| tree | 1f30f345fd91d5fb5581daa02a07872aad87f2c0 | |
| parent | 78f3a848f0b345d892f8cb9f07696c69d3a05dd8 (diff) | |
| parent | 583e60d984c8eabb9de3d9e5eda9d2f1841fcca2 (diff) | |
| download | rabbitmq-server-git-417e51a2880c0b05a54a7f3c20dc93baff93d839.tar.gz | |
merge bug25303 into default
| -rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 45 |
5 files changed, 93 insertions, 9 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a6e9b19857..9e99ca5e8c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -155,6 +155,11 @@ %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state()) + -> {A, state()}. + %% How long is my queue? -callback len(state()) -> non_neg_integer(). @@ -216,7 +221,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index fe014ef556..0380885941 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -106,7 +106,8 @@ command(S) -> {1, qc_dropwhile(S)}, {1, qc_is_empty(S)}, {1, qc_timeout(S)}, - {1, qc_purge(S)}]). + {1, qc_purge(S)}, + {1, qc_fold(S)}]). qc_publish(#state{bqstate = BQ}) -> {call, ?BQMOD, publish, @@ -157,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) -> qc_purge(#state{bqstate = BQ}) -> {call, ?BQMOD, purge, [BQ]}. +qc_fold(#state{bqstate = BQ}) -> + {call, ?BQMOD, fold, [fun foldfun/2, foldacc(), BQ]}. + %% Preconditions %% Create long queues by only allowing publishing @@ -271,7 +275,11 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}. + S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}; + +next_state(S, Res, {call, ?BQMOD, fold, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}. %% Postconditions @@ -321,6 +329,12 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end, ReportedConfirmed); +postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) -> + #state{messages = Messages} = S, + lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) -> + foldfun(Msg, Acc) + end, foldacc(), gb_trees:to_list(Messages)) =:= Res; + postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> ?BQMOD:len(BQ) =:= Len. @@ -379,6 +393,9 @@ rand_choice(List, Selection, N) -> rand_choice(List -- [Picked], [Picked | Selection], N - 1). +foldfun(Msg, Acc) -> [Msg | Acc]. +foldacc() -> []. + dropfun(Props) -> Expiry = eval({call, erlang, element, [?RECORD_INDEX(expiry, message_properties), Props]}), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8e00fba51f..8fcd1893a6 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -19,7 +19,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, - requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, + requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, foreach_ack/3]). @@ -314,6 +314,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +fold(Fun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:fold(Fun, Acc, BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:len(BQS). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ca4ad49cf8..81180ebe15 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2216,6 +2216,10 @@ variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> + variable_queue_publish(IsPersistent, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> lists:foldl( fun (N, VQN) -> rabbit_variable_queue:publish( @@ -2224,7 +2228,8 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), + end}, + PayloadFun(N)), PropFun(N, #message_properties{}), self(), VQN) end, VQ, lists:seq(1, Count)). @@ -2305,9 +2310,22 @@ test_variable_queue() -> fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, - fun test_variable_queue_requeue/1]], + fun test_variable_queue_requeue/1, + fun test_variable_queue_fold/1]], passed. +test_variable_queue_fold(VQ0) -> + Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1, + VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish( + true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2), + true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] == + [list_to_binary(lists:reverse(P)) || + #basic_message{ content = #content{ payload_fragments_rev = P}} <- + Acc], + VQ3. + test_variable_queue_requeue(VQ0) -> Interval = 50, Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 292217daa6..e2566e10b9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,8 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, - depth/1, set_ram_duration_target/2, ram_duration/1, + dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). @@ -678,6 +678,24 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +fold(Fun, Acc, #vqstate { q1 = Q1, + q2 = Q2, + delta = #delta { start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd }, + q3 = Q3, + q4 = Q4 } = State) -> + QFun = fun(MsgStatus, {Acc0, State0}) -> + {#msg_status { msg = Msg }, State1 } = + read_msg(MsgStatus, false, State0), + {Fun(Msg, Acc0), State1} + end, + {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), + {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), + {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2), + {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), + {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1), + {Acc5, State5}. + len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -1353,7 +1371,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue +%% Internal plumbing for requeue and fold %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1422,6 +1440,27 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> + {Acc, State}; +delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, + #vqstate { index_state = IndexState, + msg_store_clients = MSCState } = State) -> + DeltaSeqId1 = lists:min( + [rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, + IndexState), + {Acc1, MSCState1} = + lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, + _IsDelivered}, {Acc0, MSCState0}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, MsgId), + {Fun(Msg, Acc0), MSCState1} + end, {Acc, MSCState}, List), + delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd, + State #vqstate { index_state = IndexState1, + msg_store_clients = MSCState1 }). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
