diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-04-06 14:43:58 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-04-06 14:43:58 +0100 |
| commit | 39a0d03500351179d771ad80eec8a697ba3ae54a (patch) | |
| tree | 3ef80d59ab43cbfbc5734ea07dfb6a73b8551b30 /src | |
| parent | 9b966d0af5da4cc4c52ec583f5de981b1d81ae16 (diff) | |
| download | rabbitmq-server-git-39a0d03500351179d771ad80eec8a697ba3ae54a.tar.gz | |
In progress tweaks to VQ to support returning the dropped message in dropwhile
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 46 |
2 files changed, 31 insertions, 23 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 294fae977d..7075e01fdf 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2168,10 +2168,10 @@ test_dropwhile(VQ0) -> end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages - VQ2 = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, VQ1), + {DroppedMsgs, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff7252fd68..9db5c24714 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -558,26 +558,33 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), - a(State1). + {DroppedMsgs, State1} = dropwhile1(Pred, [], State), + {DroppedMsgs, a(State1)}. + +dropwhile1(Pred, Acc, State) -> + case internal_queue_out( + fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> + case Pred(MsgProps) of + true -> + {_, State2} = internal_fetch(false, MsgStatus, + State1), + dropwhile1(Pred, [MsgStatus | Acc], State2); + false -> + %% message needs to go back into Q4 (or maybe go + %% in for the first time if it was loaded from + %% Q3). Also the msg contents might not be in + %% RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + {ok, State2 #vqstate { + q4 = queue:in_r(MsgStatus1, Q4) }} + end + end, State) of + {empty, StateR} -> {Acc, StateR}; + {ok, StateR} -> {Acc, StateR}; + Result -> io:format("Got ~p~n", [Result]) + end. -dropwhile1(Pred, State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> - case Pred(MsgProps) of - true -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile1(Pred, State2); - false -> - %% message needs to go back into Q4 (or maybe go - %% in for the first time if it was loaded from - %% Q3). Also the msg contents might not be in - %% RAM, so read them in now - {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = - read_msg(MsgStatus, State1), - {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }} - end - end, State). fetch(AckRequired, State) -> internal_queue_out( @@ -588,6 +595,7 @@ fetch(AckRequired, State) -> internal_fetch(AckRequired, MsgStatus1, State2) end, State). + internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> |
