summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl26
1 files changed, 10 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 04dcf88d0c..741687732d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1052,12 +1052,8 @@ record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState }) ->
- {SeqIds, GuidsByStore} =
- dict:fold(fun (SeqId, {IsPersistent, Guid}, Acc) ->
- accumulate_ack(SeqId, IsPersistent, Guid, Acc);
- (_SeqId, #msg_status {}, Acc) ->
- Acc
- end, {[], dict:new()}, PA),
+ {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
+ {[], dict:new()}, PA),
State1 = State #vqstate { pending_ack = dict:new() },
case KeepPersistent of
true -> case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
@@ -1081,15 +1077,9 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
lists:foldl(
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
- {case AckEntry of
- #msg_status { index_on_disk = false, %% ASSERTIONS
- msg_on_disk = false,
- is_persistent = false } ->
- Acc;
- {IsPersistent, Guid} ->
- accumulate_ack(SeqId, IsPersistent, Guid, Acc)
- end, Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA) })}
+ {accumulate_ack(SeqId, AckEntry, Acc),
+ Fun(AckEntry, State2 #vqstate {
+ pending_ack = dict:erase(SeqId, PA) })}
end, {{[], dict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
@@ -1102,7 +1092,11 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
-accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) ->
+accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
+ msg_on_disk = false,
+ index_on_disk = false }, Acc) ->
+ Acc;
+accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
{case IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc