summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_queue_index.erl42
-rw-r--r--src/rabbit_recovery_indexes.erl108
-rw-r--r--src/rabbit_variable_queue.erl2
4 files changed, 127 insertions, 28 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 045c5d587b..67459b1780 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -598,7 +598,8 @@ recover() ->
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
- rabbit_amqqueue:start(Qs).
+ rabbit_amqqueue:start(Qs),
+ ok = rabbit_recovery_indexes:flush().
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f69d835569..3bd54d88e8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -244,24 +244,26 @@ init(Name, OnSyncFun) ->
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case read_shutdown_terms(Dir) of
- {error, _} -> [];
- {ok, Terms1} -> Terms1
+ case rabbit_recovery_indexes:read_recovery_terms(Dir) of
+ {error, _} -> [];
+ {ok, {_, Terms1}} -> Terms1
end.
recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown = detect_clean_shutdown(Dir),
+ CleanShutdown =
+ rabbit_recovery_indexes:had_clean_shutdown(Dir),
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State) ->
- {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
- store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir),
+terminate(Terms, State = #qistate { dir = Dir }) ->
+ {SegmentCounts, State1} = terminate(State),
+ rabbit_recovery_indexes:store_recovery_terms(
+ Dir, [{segments, SegmentCounts} | Terms]),
State1.
delete_and_terminate(State) ->
@@ -358,8 +360,9 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
+ ok = rabbit_recovery_indexes:recover(),
+ DurableDict = dict:from_list([{queue_name_to_dir_name(Queue), Queue} ||
+ Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
@@ -370,7 +373,8 @@ recover(DurableQueues) ->
case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case read_shutdown_terms(QueueDirPath) of
+ case rabbit_recovery_indexes:read_recovery_terms(
+ QueueDirPath) of
{error, _} -> TermsAcc;
{ok, Terms} -> [Terms | TermsAcc]
end,
@@ -378,6 +382,8 @@ recover(DurableQueues) ->
TermsAcc1};
false ->
ok = rabbit_file:recursive_delete([QueueDirPath]),
+ rabbit_recovery_indexes:remove_recovery_terms(
+ QueueDirPath),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
@@ -410,22 +416,6 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unconfirmed = gb_sets:new() }.
-clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
-
-detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_filename(Dir)) of
- ok -> true;
- {error, enoent} -> false
- end.
-
-read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_filename(Dir)).
-
-store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_filename(Dir),
- ok = rabbit_file:ensure_dir(CleanFileName),
- rabbit_file:write_term_file(CleanFileName, Terms).
-
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
%% gets us back to where we were on shutdown.
diff --git a/src/rabbit_recovery_indexes.erl b/src/rabbit_recovery_indexes.erl
new file mode 100644
index 0000000000..dd7f06962d
--- /dev/null
+++ b/src/rabbit_recovery_indexes.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_recovery_indexes).
+
+-behaviour(gen_server).
+
+-export([recover/0,
+ start_link/0,
+ store_recovery_terms/2,
+ had_clean_shutdown/1,
+ read_recovery_terms/1,
+ remove_recovery_terms/1,
+ flush/0]).
+
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-ifdef(use_specs).
+
+-spec(recover() -> 'ok').
+-spec(start_link() -> rabbit_types:ok_pid_or_error()).
+-spec(store_recovery_terms(
+ Name :: rabbit_misc:resource_name(),
+ Terms :: term()) -> rabbit_types:ok_or_error(term())).
+-spec(had_clean_shutdown(
+ rabbit_misc:resource_name()) ->
+ boolean() | rabbit_types:error(term())).
+-spec(read_recovery_terms(
+ rabbit_misc:resource_name()) ->
+ rabbit_types:ok_or_error2(term(), not_found)).
+
+-endif. % use_specs
+
+-include("rabbit.hrl").
+-define(SERVER, ?MODULE).
+-define(CLEAN_FILENAME, "clean.dot").
+
+recover() ->
+ {ok, _Child} = supervisor:start_child(rabbit_sup,
+ {?SERVER, {?MODULE, start_link, []},
+ permanent, ?MAX_WAIT, worker,
+ [?SERVER]}),
+ ok.
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+store_recovery_terms(Name, Terms) ->
+ dets:insert(?MODULE, {Name, Terms}).
+
+had_clean_shutdown(Name) ->
+ ok == remove_recovery_terms(Name).
+
+read_recovery_terms(Name) ->
+ case dets:lookup(?MODULE, Name) of
+ [Terms] -> {ok, Terms};
+ _ -> {error, not_found}
+ end.
+
+remove_recovery_terms(Name) ->
+ case dets:member(?MODULE, Name) of
+ true -> dets:delete(?MODULE, Name);
+ false -> {error, not_found}
+ end.
+
+flush() ->
+ ok = dets:sync(?MODULE).
+
+init(_) ->
+ File = filename:join([rabbit_mnesia:dir(), "queues", ?CLEAN_FILENAME]),
+ {ok, _} = dets:open_file(?MODULE, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]),
+ {ok, undefined}.
+
+handle_call(Msg, _, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok = dets:sync(?MODULE).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52d0..43e8a3b875 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -391,7 +391,7 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
start_msg_store(
- [Ref || Terms <- AllTerms,
+ [Ref || {_, Terms} <- AllTerms,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined