summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl20
1 files changed, 13 insertions, 7 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6165444409..86ed4b5739 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1159,21 +1159,27 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
{State, []};
-ack(MsgStoreFun, Fun, AckTags, State) ->
- {AckdGuids, {SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
+ack(MsgStoreFun, Fun, AckTags, State = #vqstate { pending_ack = PendAck }) ->
+ {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {Gs, Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
- {[AckEntry | Gs],
- accumulate_ack(SeqId, AckEntry, Acc),
+ {accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
pending_ack = dict:erase(SeqId, PA) })}
- end, {[], {[], orddict:new()}, State}, AckTags),
+ end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
+ AckdGuids = lists:foldl(
+ fun(SeqId, Guids) ->
+ [case dict:fetch(SeqId, PendAck) of
+ #msg_status { msg = Msg } -> Msg#basic_message.guid;
+ {_, Guid} -> Guid
+ end | Guids]
+ end, [], SeqIds),
State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),