summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-23 15:29:40 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-23 15:29:40 +0000
commit417e51a2880c0b05a54a7f3c20dc93baff93d839 (patch)
tree1f30f345fd91d5fb5581daa02a07872aad87f2c0
parent78f3a848f0b345d892f8cb9f07696c69d3a05dd8 (diff)
parent583e60d984c8eabb9de3d9e5eda9d2f1841fcca2 (diff)
downloadrabbitmq-server-git-417e51a2880c0b05a54a7f3c20dc93baff93d839.tar.gz
merge bug25303 into default
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_backing_queue_qc.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl7
-rw-r--r--src/rabbit_tests.erl22
-rw-r--r--src/rabbit_variable_queue.erl45
5 files changed, 93 insertions, 9 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a6e9b19857..9e99ca5e8c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -155,6 +155,11 @@
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state())
+ -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -216,7 +221,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index fe014ef556..0380885941 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -106,7 +106,8 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
- {1, qc_purge(S)}]).
+ {1, qc_purge(S)},
+ {1, qc_fold(S)}]).
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
@@ -157,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) ->
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
+qc_fold(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fold, [fun foldfun/2, foldacc(), BQ]}.
+
%% Preconditions
%% Create long queues by only allowing publishing
@@ -271,7 +275,11 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
+
+next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1}.
%% Postconditions
@@ -321,6 +329,12 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
+postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
+ #state{messages = Messages} = S,
+ lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) ->
+ foldfun(Msg, Acc)
+ end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
+
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -379,6 +393,9 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
+foldfun(Msg, Acc) -> [Msg | Acc].
+foldacc() -> [].
+
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8e00fba51f..8fcd1893a6 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -19,7 +19,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
@@ -314,6 +314,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ca4ad49cf8..81180ebe15 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2216,6 +2216,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
+ variable_queue_publish(IsPersistent, Count, PropFun,
+ fun (_N) -> <<>> end, VQ).
+
+variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2224,7 +2228,8 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
+ end},
+ PayloadFun(N)),
PropFun(N, #message_properties{}), self(), VQN)
end, VQ, lists:seq(1, Count)).
@@ -2305,9 +2310,22 @@ test_variable_queue() ->
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1]],
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_fold/1]],
passed.
+test_variable_queue_fold(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1,
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(
+ true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2),
+ true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
+ [list_to_binary(lists:reverse(P)) ||
+ #basic_message{ content = #content{ payload_fragments_rev = P}} <-
+ Acc],
+ VQ3.
+
test_variable_queue_requeue(VQ0) ->
Interval = 50,
Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 292217daa6..e2566e10b9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
@@ -678,6 +678,24 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd },
+ q3 = Q3,
+ q4 = Q4 } = State) ->
+ QFun = fun(MsgStatus, {Acc0, State0}) ->
+ {#msg_status { msg = Msg }, State1 } =
+ read_msg(MsgStatus, false, State0),
+ {Fun(Msg, Acc0), State1}
+ end,
+ {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
+ {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
+ {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
+ {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1),
+ {Acc5, State5}.
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -1353,7 +1371,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
+%% Internal plumbing for requeue and fold
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1422,6 +1440,27 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {Acc, State};
+delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState } = State) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Acc1, MSCState1} =
+ lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent,
+ _IsDelivered}, {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {Fun(Msg, Acc0), MSCState1}
+ end, {Acc, MSCState}, List),
+ delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------