summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2014-01-14 14:15:13 +0000
committerTim Watson <watson.timothy@gmail.com>2014-01-14 14:15:13 +0000
commitdc23dce3c64329ab9a00877ba47be12ed0a1d6ff (patch)
tree49bb520672144e94cb447e556cecdb2cc57aef14
parentc42b9cdcfee2e3cea6d9ae29a864bbfde56d654c (diff)
downloadrabbitmq-server-git-dc23dce3c64329ab9a00877ba47be12ed0a1d6ff.tar.gz
Refactor / maintain a clean interface to the backing queue
Instead of passing amqqueue records to BQ:start/1, revert to passing queue names and return a list of queue recovery terms ordered identically to the given queue names. As a result, we can go back to keying recovery data off the unique queue directory (base)name and no longer need to track the queue name in the qi. We now also only need the queue directory name to lookup recovery terms. Also update the BQ interface documentation and callbacks/specs.
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_queue_index.erl62
-rw-r--r--src/rabbit_recovery_terms.erl67
-rw-r--r--src/rabbit_variable_queue.erl4
5 files changed, 70 insertions, 82 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fefb4907f3..5357492f80 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,13 +195,18 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- {ok, Queues} = BQ:start(DurableQueues),
+
+ %% We reply on BQ:start/1 returning the recovery terms in the same
+ %% order as the supplied queue names, so that we can zip them together
+ %% for further processing in recover_durable_queues.
+ {ok, OrderedRecoveryTerms} =
+ BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(Queues).
+ recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -229,9 +234,9 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues) ->
+recover_durable_queues(QueuesAndRecoveryTerms) ->
Qs = [{start_queue_process(node(), Q), Terms} ||
- {Q, Terms} <- DurableQueues],
+ {Q, Terms} <- QueuesAndRecoveryTerms],
[Q || {Q, Terms} <- Qs, queue_init(Q, Terms) == {new, Q}].
queue_init(#amqqueue{ pid = Pid }, RecoveryTerms) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9e1ebf4187..11563a4e39 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -41,7 +41,11 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_types:amqqueue()]) -> rabbit_types:ok(recovery_terms()).
+%%
+%% The list of queue recovery terms returned as {ok, Terms} MUST be given
+%% in the same order as the list of queue names supplied.
+%%
+-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 57f70a7bd1..50cd79eaf1 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -159,7 +159,7 @@
%%----------------------------------------------------------------------------
--record(qistate, { dir, qname, segments, journal_handle, dirty_count,
+-record(qistate, { dir, segments, journal_handle, dirty_count,
max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -219,8 +219,7 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([rabbit_types:amqqueue()]) ->
- {[{file:filename(), [any()]}], {walker(A), A}}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -254,9 +253,9 @@ recover(Name, {Recovery, Terms}, MsgStoreRecovered,
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State = #qistate { qname = QueueName }) ->
+terminate(Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_terms:store(QueueName, [{segments, SegmentCounts} | Terms]),
+ rabbit_recovery_terms:store(Dir, [{segments, SegmentCounts} | Terms]),
State1.
delete_and_terminate(State) ->
@@ -352,40 +351,33 @@ bounds(State = #qistate { segments = Segments }) ->
end,
{LowSeqId, NextSeqId, State}.
-recover(DurableQueues) ->
+recover(DurableQueueNames) ->
rabbit_recovery_terms:recover(),
- DurableDict =
- dict:from_list(
- [ begin
- #amqqueue{name = QueueName} = Queue,
- DirName = queue_name_to_dir_name(QueueName),
- {DirName, Queue}
- end || Queue <- DurableQueues ]),
-
- {DurableQueueNames, DurableTerms} =
- dict:fold(
- fun (QueueDirName, Queue=#amqqueue{name = QName},
- {DurableAcc, TermsAcc}) ->
- RecoveryInfo =
- case rabbit_recovery_terms:read(QName, QueueDirName) of
- {error, _} -> {Queue, non_clean_shutdown};
- {ok, Terms} -> {Queue, Terms}
- end,
- {[QName | DurableAcc], [RecoveryInfo | TermsAcc]}
- end, {[], []}, DurableDict),
+ {DurableTerms, DurableDirectories} =
+ lists:foldl(
+ fun(QName, {RecoveryTerms, ValidDirectories}) ->
+ DirName = queue_name_to_dir_name(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ {[RecoveryInfo | RecoveryTerms],
+ sets:add_element(DirName, ValidDirectories)}
+ end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
QueuesDir = queues_dir(),
- lists:foreach(
- fun(QueueDir) ->
- case dict:is_key(filename:basename(QueueDir), DurableDict) of
- true -> ok;
- false -> ok = rabbit_file:recursive_delete([QueueDir])
- end
- end, all_queue_directory_names(QueuesDir)),
+ [rabbit_file:recursive_delete([QueueDir]) ||
+ QueueDir <- all_queue_directory_names(QueuesDir),
+ not sets:is_element(filename:basename(QueueDir),
+ DurableDirectories)],
rabbit_recovery_terms:clear(),
- {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
case rabbit_file:list_dir(Dir) of
@@ -401,15 +393,13 @@ all_queue_directory_names(Dir) ->
blank_state(QueueName) ->
blank_state_dir(
- QueueName,
filename:join(queues_dir(),
queue_name_to_dir_name(QueueName))).
-blank_state_dir(QueueName, Dir) ->
+blank_state_dir(Dir) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
- qname = QueueName,
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 16a9d032c9..9ff8222d86 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -22,7 +22,7 @@
-behaviour(gen_server).
-export([recover/0, upgrade_recovery_terms/0, start_link/0,
- store/2, read/2, lookup/2, clear/0, flush/0]).
+ store/2, read/1, clear/0, flush/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -35,10 +35,7 @@
-spec(upgrade_recovery_terms() -> 'ok').
-spec(start_link() -> rabbit_types:ok_pid_or_error()).
-spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())).
--spec(read(rabbit_amqqueue:name(), file:filename()) ->
- rabbit_types:ok_or_error(not_found)).
--spec(lookup(file:filename(),
- [{file:filename(), [term()]}]) -> {'ok', [term()]} | 'false').
+-spec(read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found)).
-spec(clear() -> 'ok').
-endif. % use_specs
@@ -58,50 +55,44 @@ recover() ->
end.
upgrade_recovery_terms() ->
- create_tables(),
- QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
- DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true,
- fun(F, Acc) -> [F|Acc] end, []),
- [begin
- {ok, Terms} = rabbit_file:read_term_file(File),
- ok = ets:insert_new(?UPGRADE_TABLE, {filename:dirname(File), Terms}),
- case file:delete(File) of
- {error, E} ->
- rabbit_log:warning("Unable to delete recovery index"
- "~s during upgrade: ~p~n", [File, E]);
- ok ->
- ok
- end
- end || File <- DotFiles],
- ok.
+ create_table(),
+ try
+ QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
+ DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true,
+ fun(F, Acc) -> [F|Acc] end, []),
+ [begin
+ {ok, Terms} = rabbit_file:read_term_file(File),
+ ok = store(filename:dirname(File), Terms),
+ case file:delete(File) of
+ {error, E} ->
+ rabbit_log:warning("Unable to delete recovery index"
+ "~s during upgrade: ~p~n", [File, E]);
+ ok ->
+ ok
+ end
+ end || File <- DotFiles],
+ ok
+ after
+ flush()
+ end.
start_link() -> gen_server:start_link(?MODULE, [], []).
-store(QueueName, Terms) -> dets:insert(?MODULE, {QueueName, Terms}).
+store(QueueDir, Terms) -> dets:insert(?MODULE, {to_key(QueueDir), Terms}).
-read(QueueName, QueueDir) ->
- case dets:lookup(?MODULE, QueueName) of
- [{_, Terms}] -> {ok, Terms};
- _ -> read_from_upgrades(QueueDir)
- end.
-
-read_from_upgrades(QueueDir) ->
- case ets:lookup(?UPGRADE_TABLE, QueueDir) of
+read(QueueDir) ->
+ case dets:lookup(?MODULE, QueueDir) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
-lookup(QueueName, Terms) ->
- lists:keyfind(QueueName, 1, Terms).
-
clear() ->
dets:delete_all_objects(?MODULE),
- ets:delete_all_objects(?UPGRADE_TABLE),
flush().
init(_) ->
process_flag(trap_exit, true),
- create_tables(),
+ create_table(),
{ok, undefined}.
handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
@@ -119,13 +110,11 @@ code_change(_OldVsn, State, _Extra) ->
flush() -> dets:sync(?MODULE).
-create_tables() ->
+create_table() ->
File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
{ok, _} = dets:open_file(?MODULE, [{file, File},
{ram_file, true},
- {auto_save, infinity}]),
- ets:new(?UPGRADE_TABLE, [set, public, named_table]),
- ok.
+ {auto_save, infinity}]).
to_key(QueueDir) -> filename:basename(QueueDir).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac0b0f78fd..82326651f4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -395,9 +395,9 @@ start(DurableQueues) ->
persistent_refs(Terms) -> lists:foldl(fun persistent_refs/2, [], Terms).
-persistent_refs({_, non_clean_shutdown}, Acc) ->
+persistent_refs(non_clean_shutdown, Acc) ->
Acc;
-persistent_refs({_, Terms}, Acc) ->
+persistent_refs(Terms, Acc) ->
Ref = proplists:get_value(persistent_ref, Terms),
case Ref of
undefined -> Acc;