summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-04-06 14:43:58 +0100
committerRob Harrop <rob@rabbitmq.com>2011-04-06 14:43:58 +0100
commit39a0d03500351179d771ad80eec8a697ba3ae54a (patch)
tree3ef80d59ab43cbfbc5734ea07dfb6a73b8551b30 /src
parent9b966d0af5da4cc4c52ec583f5de981b1d81ae16 (diff)
downloadrabbitmq-server-git-39a0d03500351179d771ad80eec8a697ba3ae54a.tar.gz
In progress tweaks to VQ to support returning the dropped message in dropwhile
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl46
2 files changed, 31 insertions, 23 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 294fae977d..7075e01fdf 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2168,10 +2168,10 @@ test_dropwhile(VQ0) ->
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
- VQ2 = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, VQ1),
+ {DroppedMsgs, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ff7252fd68..9db5c24714 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -558,26 +558,33 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
{gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
dropwhile(Pred, State) ->
- {_OkOrEmpty, State1} = dropwhile1(Pred, State),
- a(State1).
+ {DroppedMsgs, State1} = dropwhile1(Pred, [], State),
+ {DroppedMsgs, a(State1)}.
+
+dropwhile1(Pred, Acc, State) ->
+ case internal_queue_out(
+ fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
+ case Pred(MsgProps) of
+ true ->
+ {_, State2} = internal_fetch(false, MsgStatus,
+ State1),
+ dropwhile1(Pred, [MsgStatus | Acc], State2);
+ false ->
+ %% message needs to go back into Q4 (or maybe go
+ %% in for the first time if it was loaded from
+ %% Q3). Also the msg contents might not be in
+ %% RAM, so read them in now
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ read_msg(MsgStatus, State1),
+ {ok, State2 #vqstate {
+ q4 = queue:in_r(MsgStatus1, Q4) }}
+ end
+ end, State) of
+ {empty, StateR} -> {Acc, StateR};
+ {ok, StateR} -> {Acc, StateR};
+ Result -> io:format("Got ~p~n", [Result])
+ end.
-dropwhile1(Pred, State) ->
- internal_queue_out(
- fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
- case Pred(MsgProps) of
- true ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile1(Pred, State2);
- false ->
- %% message needs to go back into Q4 (or maybe go
- %% in for the first time if it was loaded from
- %% Q3). Also the msg contents might not be in
- %% RAM, so read them in now
- {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
- read_msg(MsgStatus, State1),
- {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }}
- end
- end, State).
fetch(AckRequired, State) ->
internal_queue_out(
@@ -588,6 +595,7 @@ fetch(AckRequired, State) ->
internal_fetch(AckRequired, MsgStatus1, State2)
end, State).
+
internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->