summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl23
1 files changed, 17 insertions, 6 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 2ca5b8b8d0..ae16eb2c34 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -384,6 +384,10 @@ delete_queue_directory(Dir) ->
[ filename:join(Dir, Entry) || Entry <- Entries ]),
ok = file:del_dir(Dir).
+add_ack_to_ack_dict(SeqId, ADict) ->
+ {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
@@ -454,20 +458,27 @@ load_journal(Hdl, ADict) ->
_ErrOrEoF -> ADict
end.
-add_ack_to_ack_dict(SeqId, ADict) ->
- {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
-
replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) ->
SegPath = seg_num_to_path(Dir, SegNum),
{SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()),
ValidRelSeqIds = dict:fetch_keys(SDict),
ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds),
sets:from_list(Acks)),
- {append_acks_to_segment(SegPath, SegNum, AckCounts,
- sets:to_list(ValidAcks)),
+ AcksToAppend = deliver_and_ack_transient(SDict, sets:to_list(ValidAcks)),
+ {append_acks_to_segment(SegPath, SegNum, AckCounts, AcksToAppend),
TotalMsgCount - sets:size(ValidAcks), Dir}.
+deliver_and_ack_transient(SDict, Acks) ->
+ %% because an Ack entry and a Delivered entry are identical, we
+ %% simply add the RelSeq twice to the accumulator for transient
+ %% msgs that have not yet been delivered.
+ dict:fold(fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) ->
+ Acc;
+ (RelSeq, {_MsgId, true, false}, Acc) ->
+ [RelSeq | Acc];
+ (RelSeq, {_MsgId, false, false}, Acc) ->
+ [RelSeq, RelSeq | Acc]
+ end, Acks, SDict).
%%----------------------------------------------------------------------------
%% Loading Segments