summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl74
1 files changed, 37 insertions, 37 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e0cb39780b..e21b7f6f2d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -548,27 +548,29 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
ack([], State) ->
State;
-ack(AckTags, State = #vqstate { index_state = IndexState,
- persistent_count = PCount,
- pending_ack = PA }) ->
- {GuidsByStore, SeqIds, PA1} =
+ack(AckTags, State) ->
+ {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {Dict, SeqIds, PAN}) ->
- PAN1 = dict:erase(SeqId, PAN),
- case dict:find(SeqId, PAN) of
+ fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate {
+ pending_ack = PA }}) ->
+ PA1 = dict:erase(SeqId, PA),
+ State3 = State2 #vqstate { pending_ack = PA1 },
+ case dict:find(SeqId, PA) of
{ok, #msg_status { index_on_disk = false, %% ASSERTIONS
msg_on_disk = false,
is_persistent = false }} ->
- {Dict, SeqIds, PAN1};
+ {SeqIdsAcc, Dict, State3};
{ok, {IsPersistent, Guid}} ->
- SeqIds1 = case IsPersistent of
- true -> [SeqId | SeqIds];
- false -> SeqIds
- end,
- {rabbit_misc:dict_cons(find_msg_store(IsPersistent),
- Guid, Dict), SeqIds1, PAN1}
+ MsgStore = find_msg_store(IsPersistent),
+ SeqIdsAcc1 = case IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
+ {SeqIdsAcc1,
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict), State3}
end
- end, {dict:new(), [], PA}, AckTags),
+ end, {[], dict:new(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
@@ -577,9 +579,8 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
error -> 0;
{ok, Guids} -> length(Guids)
end,
- State #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- pending_ack = PA1 }.
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }.
tx_publish(Txn,
Msg = #basic_message { is_persistent = true, guid = Guid },
@@ -630,37 +631,36 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
end}.
requeue(AckTags, State) ->
- {SeqIds, GuidsByStore,
- State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
+ {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {SeqIdsAcc, Dict, StateN = #vqstate {
- msg_store_clients = MSCStateN,
- pending_ack = PAN }}) ->
- PAN1 = dict:erase(SeqId, PAN),
- StateN1 = StateN #vqstate { pending_ack = PAN1 },
- case dict:find(SeqId, PAN) of
- {ok, #msg_status { index_on_disk = false,
+ fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate {
+ msg_store_clients = MSCState,
+ pending_ack = PA }}) ->
+ PA1 = dict:erase(SeqId, PA),
+ State3 = State2 #vqstate { pending_ack = PA1 },
+ case dict:find(SeqId, PA) of
+ {ok, #msg_status { index_on_disk = false, %% ASSERTIONS
msg_on_disk = false,
is_persistent = false,
msg = Msg }} ->
- {_SeqId, StateN2} =
- publish(Msg, true, false, StateN1),
- {SeqIdsAcc, Dict, StateN2};
+ {_SeqId, State4} =
+ publish(Msg, true, false, State3),
+ {SeqIdsAcc, Dict, State4};
{ok, {IsPersistent, Guid}} ->
- {{ok, Msg = #basic_message{}}, MSCStateN1} =
+ {{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(
- MSCStateN, IsPersistent, Guid),
- StateN2 = StateN1 #vqstate {
- msg_store_clients = MSCStateN1 },
- {_SeqId, StateN3} = publish(Msg, true, true, StateN2),
+ MSCState, IsPersistent, Guid),
+ State4 = State3 #vqstate {
+ msg_store_clients = MSCState1 },
+ {_SeqId, State5} = publish(Msg, true, true, State4),
MsgStore = find_msg_store(IsPersistent),
SeqIdsAcc1 = case IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end,
{SeqIdsAcc1,
- rabbit_misc:dict_cons(MsgStore, Guid, Dict), StateN3}
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict), State5}
end
end, {[], dict:new(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),