diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 28e74537bf..7051ea0585 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -71,6 +71,11 @@ -define(BATCH_SIZE, 10000). -define(CACHE_MAX_SIZE, 10485760). -define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]). +-define(SHUTDOWN_MESSAGE_KEY, shutdown_token). +-define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, + msg_id = infinity_and_beyond, + is_delivered = never + }). -define(SERVER, ?MODULE). @@ -392,6 +397,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> E -> E end, + ok = detect_shutdown_state_and_adjust_delivered_flags(), + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), {ok, MsgLocationDets} = @@ -582,6 +589,7 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, _ -> sync_current_file_handle(State), file:close(FileHdl) end, + store_safe_shutdown(), HC1 = rabbit_file_handle_cache:close_all(HC), State1 #dqstate { current_file_handle = undefined, current_dirty = false, @@ -854,6 +862,72 @@ msg_to_bin(Msg = #basic_message { content = Content }) -> bin_to_msg(MsgBin) -> binary_to_term(MsgBin). + +store_safe_shutdown() -> + ok = rabbit_misc:execute_mnesia_transaction( + fun() -> + mnesia:write(rabbit_disk_queue, + ?SHUTDOWN_MESSAGE, write) + end). + +detect_shutdown_state_and_adjust_delivered_flags() -> + MarkDelivered = + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:read(rabbit_disk_queue, + ?SHUTDOWN_MESSAGE_KEY, read) of + [?SHUTDOWN_MESSAGE] -> + mnesia:delete(rabbit_disk_queue, + ?SHUTDOWN_MESSAGE_KEY, write), + false; + [] -> + true + end + end), + %% if we crash here, then on startup we'll not find the + %% SHUTDOWN_MESSAGE so will mark everything delivered, which is + %% the safe thing to do. + case MarkDelivered of + true -> mark_messages_delivered(); + false -> ok + end. + +mark_messages_delivered() -> + mark_message_delivered('$start_of_table'). + +%% A single huge transaction is a bad idea because of memory +%% use. Equally, using dirty operations is a bad idea because you +%% shouldn't do writes when doing mnesia:dirty_next, because the +%% ordering can change. So we use transactions of bounded +%% size. However, even this does necessitate restarting between +%% transactions. +mark_message_delivered('$end_of_table') -> + ok; +mark_message_delivered(_Key) -> + mark_message_delivered( + rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + mark_message_delivered(mnesia:first(rabbit_disk_queue), + ?BATCH_SIZE) + end)). + +mark_message_delivered(Key, 0) -> + Key; +mark_message_delivered(Key = '$end_of_table', _N) -> + Key; +mark_message_delivered(Key, N) -> + [Obj] = mnesia:read(rabbit_disk_queue, Key, write), + M = case Obj #dq_msg_loc.is_delivered of + true -> N; + false -> + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { is_delivered = true }, + write), + N - 1 + end, + mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). + %%---------------------------------------------------------------------------- %% internal functions %%---------------------------------------------------------------------------- |
