diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 2ca5b8b8d0..ae16eb2c34 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -384,6 +384,10 @@ delete_queue_directory(Dir) -> [ filename:join(Dir, Entry) || Entry <- Entries ]), ok = file:del_dir(Dir). +add_ack_to_ack_dict(SeqId, ADict) -> + {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). + %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- @@ -454,20 +458,27 @@ load_journal(Hdl, ADict) -> _ErrOrEoF -> ADict end. -add_ack_to_ack_dict(SeqId, ADict) -> - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). - replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) -> SegPath = seg_num_to_path(Dir, SegNum), {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks)), - {append_acks_to_segment(SegPath, SegNum, AckCounts, - sets:to_list(ValidAcks)), + AcksToAppend = deliver_and_ack_transient(SDict, sets:to_list(ValidAcks)), + {append_acks_to_segment(SegPath, SegNum, AckCounts, AcksToAppend), TotalMsgCount - sets:size(ValidAcks), Dir}. +deliver_and_ack_transient(SDict, Acks) -> + %% because an Ack entry and a Delivered entry are identical, we + %% simply add the RelSeq twice to the accumulator for transient + %% msgs that have not yet been delivered. + dict:fold(fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) -> + Acc; + (RelSeq, {_MsgId, true, false}, Acc) -> + [RelSeq | Acc]; + (RelSeq, {_MsgId, false, false}, Acc) -> + [RelSeq, RelSeq | Acc] + end, Acks, SDict). %%---------------------------------------------------------------------------- %% Loading Segments |
