diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-27 11:31:57 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-27 11:31:57 +0000 |
| commit | c4dc225c0a2520506490ae29d01be64647ea49f7 (patch) | |
| tree | be85d7acd9d2842706345fcfbbced757801f0204 /src | |
| parent | e489eaf0fc027407fb4b176fd29ac8f28ec727e0 (diff) | |
| download | rabbitmq-server-git-c4dc225c0a2520506490ae29d01be64647ea49f7.tar.gz | |
added to qi so that it will store that it was cleanly shutdown, and on startup, if it wasn't, then it marks all msgs as delivered.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 33 |
1 files changed, 28 insertions, 5 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index db738857ee..4ed22cec3f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -74,6 +74,8 @@ %% %%---------------------------------------------------------------------------- +-define(CLEAN_FILENAME, "cl.ean"). + -define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768). -define(ACK_JOURNAL_FILENAME, "ack_journal.jif"). -define(SEQ_BYTES, 8). @@ -157,13 +159,15 @@ init(Name) -> State = blank_state(Name), - {TotalMsgCount, State1} = find_ack_counts_and_deliver_transient_msgs(State), + {TotalMsgCount, State1} = read_and_prune_segments(State), scatter_journal(TotalMsgCount, State1). terminate(State = #qistate { seg_num_handles = SegHdls }) -> case 0 == dict:size(SegHdls) of true -> State; - false -> close_all_handles(State) + false -> State1 = #qistate { dir = Dir } = close_all_handles(State), + store_clean_shutdown(Dir), + State1 end. terminate_and_erase(State) -> @@ -436,6 +440,18 @@ blank_state(QueueName) -> journal_ack_count = 0, journal_ack_dict = dict:new(), seg_ack_counts = dict:new() }. + +detect_clean_shutdown(Dir) -> + case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + ok -> true; + {error, enoent} -> false + end. + +store_clean_shutdown(Dir) -> + {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME), + [write, raw, binary], + [{write_buffer, unbuffered}]), + ok = file_handle_cache:close(Hdl). %%---------------------------------------------------------------------------- @@ -474,8 +490,9 @@ queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs], %% Startup Functions %%---------------------------------------------------------------------------- -find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) -> +read_and_prune_segments(State = #qistate { dir = Dir }) -> SegNums = all_segment_nums(Dir), + CleanShutdown = detect_clean_shutdown(Dir), {TotalMsgCount, State1} = lists:foldl( fun (SegNum, {TotalMsgCount1, StateN}) -> @@ -484,7 +501,7 @@ find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) -> {TransientMsgsAcks, StateL = #qistate { seg_ack_counts = AckCounts, journal_ack_dict = JAckDict }} = - deliver_transient(SegNum, SDict, StateM), + drop_and_deliver(SegNum, SDict, CleanShutdown, StateM), %% ignore TransientMsgsAcks in AckCounts and %% JAckDict1 because the TransientMsgsAcks fall %% through into scatter_journal at which point the @@ -543,11 +560,15 @@ replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) -> {TotalMsgCount - length(ValidAcks), append_acks_to_segment(SegNum, ValidAcks, State2)}. -deliver_transient(SegNum, SDict, State) -> +drop_and_deliver(SegNum, SDict, CleanShutdown, State) -> {AckMe, DeliverMe} = dict:fold( fun (RelSeq, {MsgId, IsDelivered, true}, {AckMeAcc, DeliverMeAcc}) -> + %% msg is persistent, keep only if the msg_store has it case {IsDelivered, rabbit_msg_store:contains(MsgId)} of + {false, true} when not CleanShutdown -> + %% not delivered, but dirty shutdown => mark delivered + {AckMeAcc, [RelSeq | DeliverMeAcc]}; {_, true} -> {AckMeAcc, DeliverMeAcc}; {true, false} -> @@ -556,8 +577,10 @@ deliver_transient(SegNum, SDict, State) -> {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]} end; (RelSeq, {_MsgId, false, false}, {AckMeAcc, DeliverMeAcc}) -> + %% not persistent and not delivered => deliver and ack it {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]}; (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) -> + %% not persistent but delivered => ack it {[RelSeq | AckMeAcc], DeliverMeAcc} end, {[], []}, SDict), {Hdl, State1} = get_seg_handle(SegNum, State), |
