diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-14 12:25:09 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-14 12:25:09 +0100 |
| commit | 6f78fe2c212b11a9192c68f5be8d05734e4b3249 (patch) | |
| tree | 968852bbe0dd1fbb30c922ebf3a7352f36eccd80 | |
| parent | 01a461a749bf63a8b9ae6f5e06308c2fb9325f71 (diff) | |
| download | rabbitmq-server-git-6f78fe2c212b11a9192c68f5be8d05734e4b3249.tar.gz | |
the queue index deletes transient msgs on initialisation. This is rather elegant because it means that the delta gen fun used to seed the msg store does not generate any deltas for transient msgs, which means that the msg_store will take care of deleting transient msgs without any further interaction.
| -rw-r--r-- | src/rabbit_queue_index.erl | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 2ca5b8b8d0..ae16eb2c34 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -384,6 +384,10 @@ delete_queue_directory(Dir) -> [ filename:join(Dir, Entry) || Entry <- Entries ]), ok = file:del_dir(Dir). +add_ack_to_ack_dict(SeqId, ADict) -> + {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). + %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- @@ -454,20 +458,27 @@ load_journal(Hdl, ADict) -> _ErrOrEoF -> ADict end. -add_ack_to_ack_dict(SeqId, ADict) -> - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). - replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) -> SegPath = seg_num_to_path(Dir, SegNum), {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks)), - {append_acks_to_segment(SegPath, SegNum, AckCounts, - sets:to_list(ValidAcks)), + AcksToAppend = deliver_and_ack_transient(SDict, sets:to_list(ValidAcks)), + {append_acks_to_segment(SegPath, SegNum, AckCounts, AcksToAppend), TotalMsgCount - sets:size(ValidAcks), Dir}. +deliver_and_ack_transient(SDict, Acks) -> + %% because an Ack entry and a Delivered entry are identical, we + %% simply add the RelSeq twice to the accumulator for transient + %% msgs that have not yet been delivered. + dict:fold(fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) -> + Acc; + (RelSeq, {_MsgId, true, false}, Acc) -> + [RelSeq | Acc]; + (RelSeq, {_MsgId, false, false}, Acc) -> + [RelSeq, RelSeq | Acc] + end, Acks, SDict). %%---------------------------------------------------------------------------- %% Loading Segments |
