summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-27 11:31:57 +0000
committerMatthew Sackman <matthew@lshift.net>2009-10-27 11:31:57 +0000
commitc4dc225c0a2520506490ae29d01be64647ea49f7 (patch)
treebe85d7acd9d2842706345fcfbbced757801f0204 /src
parente489eaf0fc027407fb4b176fd29ac8f28ec727e0 (diff)
downloadrabbitmq-server-git-c4dc225c0a2520506490ae29d01be64647ea49f7.tar.gz
added to qi so that it will store that it was cleanly shutdown, and on startup, if it wasn't, then it marks all msgs as delivered.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl33
1 files changed, 28 insertions, 5 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index db738857ee..4ed22cec3f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -74,6 +74,8 @@
%%
%%----------------------------------------------------------------------------
+-define(CLEAN_FILENAME, "cl.ean").
+
-define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768).
-define(ACK_JOURNAL_FILENAME, "ack_journal.jif").
-define(SEQ_BYTES, 8).
@@ -157,13 +159,15 @@
init(Name) ->
State = blank_state(Name),
- {TotalMsgCount, State1} = find_ack_counts_and_deliver_transient_msgs(State),
+ {TotalMsgCount, State1} = read_and_prune_segments(State),
scatter_journal(TotalMsgCount, State1).
terminate(State = #qistate { seg_num_handles = SegHdls }) ->
case 0 == dict:size(SegHdls) of
true -> State;
- false -> close_all_handles(State)
+ false -> State1 = #qistate { dir = Dir } = close_all_handles(State),
+ store_clean_shutdown(Dir),
+ State1
end.
terminate_and_erase(State) ->
@@ -436,6 +440,18 @@ blank_state(QueueName) ->
journal_ack_count = 0,
journal_ack_dict = dict:new(),
seg_ack_counts = dict:new() }.
+
+detect_clean_shutdown(Dir) ->
+ case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ ok -> true;
+ {error, enoent} -> false
+ end.
+
+store_clean_shutdown(Dir) ->
+ {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME),
+ [write, raw, binary],
+ [{write_buffer, unbuffered}]),
+ ok = file_handle_cache:close(Hdl).
%%----------------------------------------------------------------------------
@@ -474,8 +490,9 @@ queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs],
%% Startup Functions
%%----------------------------------------------------------------------------
-find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) ->
+read_and_prune_segments(State = #qistate { dir = Dir }) ->
SegNums = all_segment_nums(Dir),
+ CleanShutdown = detect_clean_shutdown(Dir),
{TotalMsgCount, State1} =
lists:foldl(
fun (SegNum, {TotalMsgCount1, StateN}) ->
@@ -484,7 +501,7 @@ find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) ->
{TransientMsgsAcks, StateL =
#qistate { seg_ack_counts = AckCounts,
journal_ack_dict = JAckDict }} =
- deliver_transient(SegNum, SDict, StateM),
+ drop_and_deliver(SegNum, SDict, CleanShutdown, StateM),
%% ignore TransientMsgsAcks in AckCounts and
%% JAckDict1 because the TransientMsgsAcks fall
%% through into scatter_journal at which point the
@@ -543,11 +560,15 @@ replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) ->
{TotalMsgCount - length(ValidAcks),
append_acks_to_segment(SegNum, ValidAcks, State2)}.
-deliver_transient(SegNum, SDict, State) ->
+drop_and_deliver(SegNum, SDict, CleanShutdown, State) ->
{AckMe, DeliverMe} =
dict:fold(
fun (RelSeq, {MsgId, IsDelivered, true}, {AckMeAcc, DeliverMeAcc}) ->
+ %% msg is persistent, keep only if the msg_store has it
case {IsDelivered, rabbit_msg_store:contains(MsgId)} of
+ {false, true} when not CleanShutdown ->
+ %% not delivered, but dirty shutdown => mark delivered
+ {AckMeAcc, [RelSeq | DeliverMeAcc]};
{_, true} ->
{AckMeAcc, DeliverMeAcc};
{true, false} ->
@@ -556,8 +577,10 @@ deliver_transient(SegNum, SDict, State) ->
{[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]}
end;
(RelSeq, {_MsgId, false, false}, {AckMeAcc, DeliverMeAcc}) ->
+ %% not persistent and not delivered => deliver and ack it
{[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]};
(RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) ->
+ %% not persistent but delivered => ack it
{[RelSeq | AckMeAcc], DeliverMeAcc}
end, {[], []}, SDict),
{Hdl, State1} = get_seg_handle(SegNum, State),