diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-25 15:17:49 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-25 15:17:49 +0100 |
| commit | b42ea9aa3d01dc95ab03ebe8a7f6ce6015b302ac (patch) | |
| tree | f6a649be4e75a568b4fab27a696f1c2e9a35b60d /src | |
| parent | d5e3c94a05bc6ba16a6477343f851260706b12cc (diff) | |
| download | rabbitmq-server-git-b42ea9aa3d01dc95ab03ebe8a7f6ce6015b302ac.tar.gz | |
refactor 'dropwhile' and 'fetch'
- rename 'internal_queue_out' to 'queue_out' and turn it from 2nd to
1st order, thus making it more analogous to queue:out
- only assert state invariants at public API return points, not inside
helper functions
- inline 'dropwhile1' into 'dropwhile'
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 60 |
1 files changed, 30 insertions, 30 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index fd7bf2cc85..c838e2d71c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -560,19 +560,16 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), - a(State1). - -dropwhile1(Pred, State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, - State1), - dropwhile1(Pred, State2); - false -> {ok, in_r(MsgStatus, State1)} - end - end, State). + case queue_out(State) of + {empty, State1} -> + a(State1); + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> a(in_r(MsgStatus, State1)) + end + end. in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> @@ -587,23 +584,26 @@ in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. fetch(AckRequired, State) -> - internal_queue_out( - fun(MsgStatus, State1) -> - %% it's possible that the message wasn't read from disk - %% at this point, so read it in. - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - internal_fetch(AckRequired, MsgStatus1, State2) - end, State). - -internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + %% it is possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), + {Res, a(State3)} + end. + +queue_out(State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; {{value, MsgStatus}, Q4a} -> - Fun(MsgStatus, State #vqstate { q4 = Q4a }) + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. read_msg(MsgStatus = #msg_status { msg = undefined, @@ -665,11 +665,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), {{Msg, IsDelivered, AckTag, Len1}, - a(State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1 })}. + State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 }}. ack(AckTags, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, |
