summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-29 22:51:37 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-29 22:51:37 +0000
commite3fe968b5a5df583b6d0cfeca687e1dc9b7a8893 (patch)
tree8df50f40a9c6fa6395831ff3cef759f1fc637a73
parent39c5bb32fb6f61566f977ca6fde9750216237a78 (diff)
downloadrabbitmq-server-git-e3fe968b5a5df583b6d0cfeca687e1dc9b7a8893.tar.gz
return to a simpler, better BQ:dropwhile, and introduce 'fetchwhile'
...to cover the remaining required functionality, including the ability to process messages along the way, pass around an accumulator, and get hold of the IsDelivered flag (not needed in our use case but included for similarity with 'fetch').
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_backing_queue.erl32
-rw-r--r--src/rabbit_backing_queue_qc.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl46
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl40
6 files changed, 87 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 74717acee5..a6b3829b77 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -725,14 +725,15 @@ drop_expired_messages(State = #q{dlx = DLX,
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
+ undefined -> BQ:dropwhile(ExpirePred, BQS);
+ _ -> {Next, Msgs, BQS2} =
+ BQ:fetchwhile(ExpirePred,
+ fun accumulate_msgs/4,
+ [], BQS),
case Msgs of
[] -> ok;
- _ -> (dead_letter_fun(expired))(Msgs)
+ _ -> (dead_letter_fun(expired))(
+ lists:reverse(Msgs))
end,
{Next, BQS2}
end,
@@ -741,6 +742,8 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
+accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 96c58cb9da..272df5c1b7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -124,16 +124,25 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their Delivered
+%% flag and ack tag, to the supplied function. The function is also
+%% fed an accumulator. The result of fetchwhile is as for dropwhile
+%% plus the accumulator.
+-callback fetchwhile(msg_pred(),
+ fun ((rabbit_types:basic_message(), boolean(), ack(), A)
+ -> A),
+ A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
@@ -222,7 +231,8 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 5},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a5d0a00855..e337580cd4 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -147,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -262,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [3, Res]},
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c8a361b1e0..0ae10d89b8 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -20,7 +20,7 @@
purge/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
@@ -216,19 +216,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -268,7 +266,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ,
empty ->
{Result, State1};
{#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
- {Result, drop(MsgId, AckTag, State1)}
+ {Result, drop_one(MsgId, AckTag, State1)}
end.
drop(AckRequired, State = #state { backing_queue = BQ,
@@ -277,7 +275,7 @@ drop(AckRequired, State = #state { backing_queue = BQ,
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
empty -> State1;
- {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ {MsgId, AckTag} -> drop_one(MsgId, AckTag, State1)
end}.
ack(AckTags, State = #state { gm = GM,
@@ -440,13 +438,23 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+drop_one(MsgId, AckTag, State = #state { ack_msg_id = AM,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
+
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index df8544a4ad..d6d40b14b0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2418,10 +2418,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2438,12 +2438,10 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ {_, VQ3} = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, undefined, VQ6} =
- rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ {_, VQ6} = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5),
VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30ab96f58c..3e4c7c864f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
@@ -577,27 +578,30 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _IsDelivered, AckTag}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {{Msg, IsDelivered, AckTag}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
+ Acc1 = Fun(Msg, IsDelivered, AckTag, Acc),
+ fetchwhile(Pred, Fun, Acc1, State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.