summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-14 12:25:09 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-14 12:25:09 +0100
commit6f78fe2c212b11a9192c68f5be8d05734e4b3249 (patch)
tree968852bbe0dd1fbb30c922ebf3a7352f36eccd80
parent01a461a749bf63a8b9ae6f5e06308c2fb9325f71 (diff)
downloadrabbitmq-server-git-6f78fe2c212b11a9192c68f5be8d05734e4b3249.tar.gz
the queue index deletes transient msgs on initialisation. This is rather elegant because it means that the delta gen fun used to seed the msg store does not generate any deltas for transient msgs, which means that the msg_store will take care of deleting transient msgs without any further interaction.
-rw-r--r--src/rabbit_queue_index.erl23
1 files changed, 17 insertions, 6 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 2ca5b8b8d0..ae16eb2c34 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -384,6 +384,10 @@ delete_queue_directory(Dir) ->
[ filename:join(Dir, Entry) || Entry <- Entries ]),
ok = file:del_dir(Dir).
+add_ack_to_ack_dict(SeqId, ADict) ->
+ {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
@@ -454,20 +458,27 @@ load_journal(Hdl, ADict) ->
_ErrOrEoF -> ADict
end.
-add_ack_to_ack_dict(SeqId, ADict) ->
- {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
-
replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) ->
SegPath = seg_num_to_path(Dir, SegNum),
{SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()),
ValidRelSeqIds = dict:fetch_keys(SDict),
ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds),
sets:from_list(Acks)),
- {append_acks_to_segment(SegPath, SegNum, AckCounts,
- sets:to_list(ValidAcks)),
+ AcksToAppend = deliver_and_ack_transient(SDict, sets:to_list(ValidAcks)),
+ {append_acks_to_segment(SegPath, SegNum, AckCounts, AcksToAppend),
TotalMsgCount - sets:size(ValidAcks), Dir}.
+deliver_and_ack_transient(SDict, Acks) ->
+ %% because an Ack entry and a Delivered entry are identical, we
+ %% simply add the RelSeq twice to the accumulator for transient
+ %% msgs that have not yet been delivered.
+ dict:fold(fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) ->
+ Acc;
+ (RelSeq, {_MsgId, true, false}, Acc) ->
+ [RelSeq | Acc];
+ (RelSeq, {_MsgId, false, false}, Acc) ->
+ [RelSeq, RelSeq | Acc]
+ end, Acks, SDict).
%%----------------------------------------------------------------------------
%% Loading Segments