summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-06-25 15:17:49 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-06-25 15:17:49 +0100
commitb42ea9aa3d01dc95ab03ebe8a7f6ce6015b302ac (patch)
treef6a649be4e75a568b4fab27a696f1c2e9a35b60d /src
parentd5e3c94a05bc6ba16a6477343f851260706b12cc (diff)
downloadrabbitmq-server-git-b42ea9aa3d01dc95ab03ebe8a7f6ce6015b302ac.tar.gz
refactor 'dropwhile' and 'fetch'
- rename 'internal_queue_out' to 'queue_out' and turn it from 2nd to 1st order, thus making it more analogous to queue:out - only assert state invariants at public API return points, not inside helper functions - inline 'dropwhile1' into 'dropwhile'
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl60
1 files changed, 30 insertions, 30 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index fd7bf2cc85..c838e2d71c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -560,19 +560,16 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
{gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
dropwhile(Pred, State) ->
- {_OkOrEmpty, State1} = dropwhile1(Pred, State),
- a(State1).
-
-dropwhile1(Pred, State) ->
- internal_queue_out(
- fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
- case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus,
- State1),
- dropwhile1(Pred, State2);
- false -> {ok, in_r(MsgStatus, State1)}
- end
- end, State).
+ case queue_out(State) of
+ {empty, State1} ->
+ a(State1);
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> a(in_r(MsgStatus, State1))
+ end
+ end.
in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
@@ -587,23 +584,26 @@ in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
fetch(AckRequired, State) ->
- internal_queue_out(
- fun(MsgStatus, State1) ->
- %% it's possible that the message wasn't read from disk
- %% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- internal_fetch(AckRequired, MsgStatus1, State2)
- end, State).
-
-internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ %% it is possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
+ {Res, a(State3)}
+ end.
+
+queue_out(State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3(State) of
- {empty, State1} = Result -> a(State1), Result;
- {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1)
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
end;
{{value, MsgStatus}, Q4a} ->
- Fun(MsgStatus, State #vqstate { q4 = Q4a })
+ {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
read_msg(MsgStatus = #msg_status { msg = undefined,
@@ -665,11 +665,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
{{Msg, IsDelivered, AckTag, Len1},
- a(State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 })}.
+ State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1 }}.
ack(AckTags, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,