diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 |
1 files changed, 25 insertions, 34 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f8ced680f5..1319979cf9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -677,6 +677,12 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver(SeqId, IndexState). +accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) -> + {case IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> @@ -689,24 +695,17 @@ record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState }) -> - {SeqIds, GuidsByStore, PA1} = + {{SeqIds, GuidsByStore}, PA1} = dict:fold( - fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict, PAN}) -> - PAN1 = case KeepPersistent andalso IsPersistent of - true -> PAN; - false -> dict:erase(SeqId, PAN) - end, - case IsPersistent of - true -> {[SeqId | SeqIdsAcc], - rabbit_misc:dict_cons( - ?PERSISTENT_MSG_STORE, Guid, Dict), PAN1}; - false -> {SeqIdsAcc, - rabbit_misc:dict_cons( - ?TRANSIENT_MSG_STORE, Guid, Dict), PAN1} - end; - (SeqId, #msg_status {}, {SeqIdsAcc, Dict, PAN}) -> - {SeqIdsAcc, Dict, dict:erase(SeqId, PAN)} - end, {[], dict:new(), PA}, PA), + fun (SeqId, {IsPersistent, Guid}, {Acc, PA2}) -> + {accumulate_ack(SeqId, IsPersistent, Guid, Acc), + case KeepPersistent andalso IsPersistent of + true -> PA2; + false -> dict:erase(SeqId, PA2) + end}; + (SeqId, #msg_status {}, {Acc, PA2}) -> + {Acc, dict:erase(SeqId, PA2)} + end, {{[], dict:new()}, PA}, PA), case KeepPersistent of true -> State1 = State #vqstate { pending_ack = PA1 }, case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of @@ -854,21 +853,15 @@ ack(Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( - fun (SeqId, {{SeqIdsAcc, Dict}, State2 = #vqstate { - pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate {pending_ack = PA }}) -> {ok, AckEntry} = dict:find(SeqId, PA), {case AckEntry of #msg_status { index_on_disk = false, %% ASSERTIONS msg_on_disk = false, is_persistent = false } -> - {SeqIdsAcc, Dict}; + Acc; {IsPersistent, Guid} -> - {case IsPersistent of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - rabbit_misc:dict_cons(find_msg_store(IsPersistent), - Guid, Dict)} + accumulate_ack(SeqId, IsPersistent, Guid, Acc) end, Fun(AckEntry, State2 #vqstate { pending_ack = dict:erase(SeqId, PA) })} end, {{[], dict:new()}, State}, AckTags), @@ -998,14 +991,12 @@ remove_queue_entries1( is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {Count, GuidsByStore, SeqIdsAcc, IndexState}) -> - GuidsByStore1 = - case {MsgOnDisk, IsPersistent} of - {true, true} -> rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, - Guid, GuidsByStore); - {true, false} -> rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, - Guid, GuidsByStore); - {false, _} -> GuidsByStore - end, + GuidsByStore1 = case MsgOnDisk of + true -> rabbit_misc:dict_cons( + find_msg_store(IsPersistent), + Guid, GuidsByStore); + false -> GuidsByStore + end, SeqIdsAcc1 = case IndexOnDisk of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc |
