summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-28 17:58:10 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-28 17:58:10 +0100
commita30c1802f985ccfc14aafe568df8e00bd792ab85 (patch)
treefb3083e851f7aa420655794fbd67668ac5c32823
parentcdf508e5815e290c836bb60a4f6d8b235a8f69c9 (diff)
downloadrabbitmq-server-git-a30c1802f985ccfc14aafe568df8e00bd792ab85.tar.gz
Use an mnesia transaction to record safely shutting down, and associated wiring on startup. Manually verified this all works.
-rw-r--r--src/rabbit_disk_queue.erl74
1 files changed, 74 insertions, 0 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 28e74537bf..7051ea0585 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -71,6 +71,11 @@
-define(BATCH_SIZE, 10000).
-define(CACHE_MAX_SIZE, 10485760).
-define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]).
+-define(SHUTDOWN_MESSAGE_KEY, shutdown_token).
+-define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
+ msg_id = infinity_and_beyond,
+ is_delivered = never
+ }).
-define(SERVER, ?MODULE).
@@ -392,6 +397,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
E -> E
end,
+ ok = detect_shutdown_state_and_adjust_delivered_flags(),
+
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
{ok, MsgLocationDets} =
@@ -582,6 +589,7 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
_ -> sync_current_file_handle(State),
file:close(FileHdl)
end,
+ store_safe_shutdown(),
HC1 = rabbit_file_handle_cache:close_all(HC),
State1 #dqstate { current_file_handle = undefined,
current_dirty = false,
@@ -854,6 +862,72 @@ msg_to_bin(Msg = #basic_message { content = Content }) ->
bin_to_msg(MsgBin) ->
binary_to_term(MsgBin).
+
+store_safe_shutdown() ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ mnesia:write(rabbit_disk_queue,
+ ?SHUTDOWN_MESSAGE, write)
+ end).
+
+detect_shutdown_state_and_adjust_delivered_flags() ->
+ MarkDelivered =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:read(rabbit_disk_queue,
+ ?SHUTDOWN_MESSAGE_KEY, read) of
+ [?SHUTDOWN_MESSAGE] ->
+ mnesia:delete(rabbit_disk_queue,
+ ?SHUTDOWN_MESSAGE_KEY, write),
+ false;
+ [] ->
+ true
+ end
+ end),
+ %% if we crash here, then on startup we'll not find the
+ %% SHUTDOWN_MESSAGE so will mark everything delivered, which is
+ %% the safe thing to do.
+ case MarkDelivered of
+ true -> mark_messages_delivered();
+ false -> ok
+ end.
+
+mark_messages_delivered() ->
+ mark_message_delivered('$start_of_table').
+
+%% A single huge transaction is a bad idea because of memory
+%% use. Equally, using dirty operations is a bad idea because you
+%% shouldn't do writes when doing mnesia:dirty_next, because the
+%% ordering can change. So we use transactions of bounded
+%% size. However, even this does necessitate restarting between
+%% transactions.
+mark_message_delivered('$end_of_table') ->
+ ok;
+mark_message_delivered(_Key) ->
+ mark_message_delivered(
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ mark_message_delivered(mnesia:first(rabbit_disk_queue),
+ ?BATCH_SIZE)
+ end)).
+
+mark_message_delivered(Key, 0) ->
+ Key;
+mark_message_delivered(Key = '$end_of_table', _N) ->
+ Key;
+mark_message_delivered(Key, N) ->
+ [Obj] = mnesia:read(rabbit_disk_queue, Key, write),
+ M = case Obj #dq_msg_loc.is_delivered of
+ true -> N;
+ false ->
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { is_delivered = true },
+ write),
+ N - 1
+ end,
+ mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
+
%%----------------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------------