diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_recovery_indexes.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
4 files changed, 127 insertions, 28 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 045c5d587b..67459b1780 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -598,7 +598,8 @@ recover() -> Qs = rabbit_amqqueue:recover(), ok = rabbit_binding:recover(rabbit_exchange:recover(), [QName || #amqqueue{name = QName} <- Qs]), - rabbit_amqqueue:start(Qs). + rabbit_amqqueue:start(Qs), + ok = rabbit_recovery_indexes:flush(). maybe_insert_default_data() -> case rabbit_table:is_empty() of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f69d835569..3bd54d88e8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -244,24 +244,26 @@ init(Name, OnSyncFun) -> shutdown_terms(Name) -> #qistate { dir = Dir } = blank_state(Name), - case read_shutdown_terms(Dir) of - {error, _} -> []; - {ok, Terms1} -> Terms1 + case rabbit_recovery_indexes:read_recovery_terms(Dir) of + {error, _} -> []; + {ok, {_, Terms1}} -> Terms1 end. recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), State1 = State #qistate { on_sync = OnSyncFun }, - CleanShutdown = detect_clean_shutdown(Dir), + CleanShutdown = + rabbit_recovery_indexes:had_clean_shutdown(Dir), case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), init_clean(RecoveredCounts, State1); false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State) -> - {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), - store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir), +terminate(Terms, State = #qistate { dir = Dir }) -> + {SegmentCounts, State1} = terminate(State), + rabbit_recovery_indexes:store_recovery_terms( + Dir, [{segments, SegmentCounts} | Terms]), State1. delete_and_terminate(State) -> @@ -358,8 +360,9 @@ bounds(State = #qistate { segments = Segments }) -> {LowSeqId, NextSeqId, State}. recover(DurableQueues) -> - DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || - Queue <- DurableQueues ]), + ok = rabbit_recovery_indexes:recover(), + DurableDict = dict:from_list([{queue_name_to_dir_name(Queue), Queue} || + Queue <- DurableQueues ]), QueuesDir = queues_dir(), QueueDirNames = all_queue_directory_names(QueuesDir), DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), @@ -370,7 +373,8 @@ recover(DurableQueues) -> case sets:is_element(QueueDirName, DurableDirectories) of true -> TermsAcc1 = - case read_shutdown_terms(QueueDirPath) of + case rabbit_recovery_indexes:read_recovery_terms( + QueueDirPath) of {error, _} -> TermsAcc; {ok, Terms} -> [Terms | TermsAcc] end, @@ -378,6 +382,8 @@ recover(DurableQueues) -> TermsAcc1}; false -> ok = rabbit_file:recursive_delete([QueueDirPath]), + rabbit_recovery_indexes:remove_recovery_terms( + QueueDirPath), {DurableAcc, TermsAcc} end end, {[], []}, QueueDirNames), @@ -410,22 +416,6 @@ blank_state_dir(Dir) -> on_sync = fun (_) -> ok end, unconfirmed = gb_sets:new() }. -clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). - -detect_clean_shutdown(Dir) -> - case rabbit_file:delete(clean_filename(Dir)) of - ok -> true; - {error, enoent} -> false - end. - -read_shutdown_terms(Dir) -> - rabbit_file:read_term_file(clean_filename(Dir)). - -store_clean_shutdown(Terms, Dir) -> - CleanFileName = clean_filename(Dir), - ok = rabbit_file:ensure_dir(CleanFileName), - rabbit_file:write_term_file(CleanFileName, Terms). - init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. diff --git a/src/rabbit_recovery_indexes.erl b/src/rabbit_recovery_indexes.erl new file mode 100644 index 0000000000..dd7f06962d --- /dev/null +++ b/src/rabbit_recovery_indexes.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_recovery_indexes). + +-behaviour(gen_server). + +-export([recover/0, + start_link/0, + store_recovery_terms/2, + had_clean_shutdown/1, + read_recovery_terms/1, + remove_recovery_terms/1, + flush/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-ifdef(use_specs). + +-spec(recover() -> 'ok'). +-spec(start_link() -> rabbit_types:ok_pid_or_error()). +-spec(store_recovery_terms( + Name :: rabbit_misc:resource_name(), + Terms :: term()) -> rabbit_types:ok_or_error(term())). +-spec(had_clean_shutdown( + rabbit_misc:resource_name()) -> + boolean() | rabbit_types:error(term())). +-spec(read_recovery_terms( + rabbit_misc:resource_name()) -> + rabbit_types:ok_or_error2(term(), not_found)). + +-endif. % use_specs + +-include("rabbit.hrl"). +-define(SERVER, ?MODULE). +-define(CLEAN_FILENAME, "clean.dot"). + +recover() -> + {ok, _Child} = supervisor:start_child(rabbit_sup, + {?SERVER, {?MODULE, start_link, []}, + permanent, ?MAX_WAIT, worker, + [?SERVER]}), + ok. + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +store_recovery_terms(Name, Terms) -> + dets:insert(?MODULE, {Name, Terms}). + +had_clean_shutdown(Name) -> + ok == remove_recovery_terms(Name). + +read_recovery_terms(Name) -> + case dets:lookup(?MODULE, Name) of + [Terms] -> {ok, Terms}; + _ -> {error, not_found} + end. + +remove_recovery_terms(Name) -> + case dets:member(?MODULE, Name) of + true -> dets:delete(?MODULE, Name); + false -> {error, not_found} + end. + +flush() -> + ok = dets:sync(?MODULE). + +init(_) -> + File = filename:join([rabbit_mnesia:dir(), "queues", ?CLEAN_FILENAME]), + {ok, _} = dets:open_file(?MODULE, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]), + {ok, undefined}. + +handle_call(Msg, _, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok = dets:sync(?MODULE). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52d0..43e8a3b875 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -391,7 +391,7 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), start_msg_store( - [Ref || Terms <- AllTerms, + [Ref || {_, Terms} <- AllTerms, begin Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined |
