diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 230aa612d1..8e23b59146 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -584,7 +584,7 @@ dropwhile(Pred, State) -> {undefined, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + true -> {_, State2} = remove(false, MsgStatus, State1), dropwhile(Pred, State2); false -> {MsgProps, a(in_r(MsgStatus, State1))} end @@ -598,7 +598,7 @@ fetchwhile(Pred, Fun, Acc, State) -> case Pred(MsgProps) of true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {{Msg, IsDelivered, AckTag}, State3} = - internal_fetch(true, MsgStatus1, State2), + remove(true, MsgStatus1, State2), Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), fetchwhile(Pred, Fun, Acc1, State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} @@ -613,7 +613,7 @@ fetch(AckRequired, State) -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), + {Res, State3} = remove(AckRequired, MsgStatus1, State2), {Res, a(State3)} end. @@ -623,7 +623,7 @@ drop(AckRequired, State) -> {empty, a(State1)}; {{value, MsgStatus}, State1} -> {{_Msg, _IsDelivered, AckTag}, State2} = - internal_fetch(AckRequired, MsgStatus, State1), + remove(AckRequired, MsgStatus, State1), {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} end. @@ -1108,20 +1108,20 @@ read_msg(MsgStatus = #msg_status { msg = undefined, read_msg(MsgStatus, _CountDiskToRam, State) -> {MsgStatus, State}. -internal_fetch(AckRequired, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount }) -> +remove(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, |
