diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e06f32bf42..1aba634367 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -438,7 +438,7 @@ terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(true, State), + purge_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), @@ -457,12 +457,12 @@ terminate(_Reason, State) -> %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(_Reason, State) -> %% TODO: there is no need to interact with qi at all - which we do - %% as part of 'purge' and 'remove_pending_ack', other than + %% as part of 'purge' and 'purge_pending_ack', other than %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(false, State1), + purge_pending_ack(false, State1), IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), case MSCStateP of undefined -> ok; @@ -574,13 +574,9 @@ ack(AckTags, State) -> persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, - ram_ack_index = RAI }}) -> - AckEntry = dict:fetch(SeqId, PA), - {accumulate_ack(SeqId, AckEntry, Acc), - State2 #vqstate { - pending_ack = dict:erase(SeqId, PA), - ram_ack_index = gb_trees:delete_any(SeqId, RAI)}} + fun (SeqId, {Acc, State2}) -> + {AckEntry, State3} = remove_pending_ack(SeqId, State2), + {accumulate_ack(SeqId, AckEntry, Acc), State3} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) @@ -1215,10 +1211,16 @@ record_pending_ack(#msg_status { seq_id = SeqId, ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. -remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState, - msg_store_clients = MSCState }) -> +remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + {dict:fetch(SeqId, PA), + State #vqstate { pending_ack = dict:erase(SeqId, PA), + ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. + +purge_pending_ack(KeepPersistent, + State = #vqstate { pending_ack = PA, + index_state = IndexState, + msg_store_clients = MSCState }) -> {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), @@ -1422,23 +1424,21 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, MsgPropsFun, - #vqstate { pending_ack = PA, - ram_ack_index = RAI } = State) -> - State1 = State #vqstate { pending_ack = dict:erase(SeqId, PA), - ram_ack_index = gb_trees:delete_any(SeqId, RAI)}, +msg_from_pending_ack(SeqId, MsgPropsFun, State) -> + {AckEntry, State1} = remove_pending_ack(SeqId, State), #msg_status { msg_props = MsgProps1 } = MsgStatus1 = - case dict:fetch(SeqId, PA) of + case AckEntry of {IsPersistent, MsgId, MsgProps, IndexOnDisk} -> - #msg_status { seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = true, - msg_on_disk = true, - index_on_disk = IndexOnDisk, - msg_props = MsgProps }; - #msg_status{} = MsgStatus0 -> MsgStatus0 + m(#msg_status { seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = true, + msg_on_disk = true, + index_on_disk = IndexOnDisk, + msg_props = MsgProps }); + #msg_status{} = MsgStatus -> + MsgStatus end, {MsgStatus1 #msg_status { msg_props = (MsgPropsFun(MsgProps1)) #message_properties { |
