diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 74 |
1 files changed, 37 insertions, 37 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e0cb39780b..e21b7f6f2d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -548,27 +548,29 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, ack([], State) -> State; -ack(AckTags, State = #vqstate { index_state = IndexState, - persistent_count = PCount, - pending_ack = PA }) -> - {GuidsByStore, SeqIds, PA1} = +ack(AckTags, State) -> + {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, + persistent_count = PCount }} = lists:foldl( - fun (SeqId, {Dict, SeqIds, PAN}) -> - PAN1 = dict:erase(SeqId, PAN), - case dict:find(SeqId, PAN) of + fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate { + pending_ack = PA }}) -> + PA1 = dict:erase(SeqId, PA), + State3 = State2 #vqstate { pending_ack = PA1 }, + case dict:find(SeqId, PA) of {ok, #msg_status { index_on_disk = false, %% ASSERTIONS msg_on_disk = false, is_persistent = false }} -> - {Dict, SeqIds, PAN1}; + {SeqIdsAcc, Dict, State3}; {ok, {IsPersistent, Guid}} -> - SeqIds1 = case IsPersistent of - true -> [SeqId | SeqIds]; - false -> SeqIds - end, - {rabbit_misc:dict_cons(find_msg_store(IsPersistent), - Guid, Dict), SeqIds1, PAN1} + MsgStore = find_msg_store(IsPersistent), + SeqIdsAcc1 = case IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, + {SeqIdsAcc1, + rabbit_misc:dict_cons(MsgStore, Guid, Dict), State3} end - end, {dict:new(), [], PA}, AckTags), + end, {[], dict:new(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) @@ -577,9 +579,8 @@ ack(AckTags, State = #vqstate { index_state = IndexState, error -> 0; {ok, Guids} -> length(Guids) end, - State #vqstate { index_state = IndexState1, - persistent_count = PCount1, - pending_ack = PA1 }. + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }. tx_publish(Txn, Msg = #basic_message { is_persistent = true, guid = Guid }, @@ -630,37 +631,36 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> end}. requeue(AckTags, State) -> - {SeqIds, GuidsByStore, - State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = + {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, + persistent_count = PCount }} = lists:foldl( - fun (SeqId, {SeqIdsAcc, Dict, StateN = #vqstate { - msg_store_clients = MSCStateN, - pending_ack = PAN }}) -> - PAN1 = dict:erase(SeqId, PAN), - StateN1 = StateN #vqstate { pending_ack = PAN1 }, - case dict:find(SeqId, PAN) of - {ok, #msg_status { index_on_disk = false, + fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate { + msg_store_clients = MSCState, + pending_ack = PA }}) -> + PA1 = dict:erase(SeqId, PA), + State3 = State2 #vqstate { pending_ack = PA1 }, + case dict:find(SeqId, PA) of + {ok, #msg_status { index_on_disk = false, %% ASSERTIONS msg_on_disk = false, is_persistent = false, msg = Msg }} -> - {_SeqId, StateN2} = - publish(Msg, true, false, StateN1), - {SeqIdsAcc, Dict, StateN2}; + {_SeqId, State4} = + publish(Msg, true, false, State3), + {SeqIdsAcc, Dict, State4}; {ok, {IsPersistent, Guid}} -> - {{ok, Msg = #basic_message{}}, MSCStateN1} = + {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store( - MSCStateN, IsPersistent, Guid), - StateN2 = StateN1 #vqstate { - msg_store_clients = MSCStateN1 }, - {_SeqId, StateN3} = publish(Msg, true, true, StateN2), + MSCState, IsPersistent, Guid), + State4 = State3 #vqstate { + msg_store_clients = MSCState1 }, + {_SeqId, State5} = publish(Msg, true, true, State4), MsgStore = find_msg_store(IsPersistent), SeqIdsAcc1 = case IsPersistent of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, {SeqIdsAcc1, - rabbit_misc:dict_cons(MsgStore, Guid, Dict), StateN3} + rabbit_misc:dict_cons(MsgStore, Guid, Dict), State5} end end, {[], dict:new(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), |
