diff options
| author | Tim Watson <watson.timothy@gmail.com> | 2014-01-14 14:15:13 +0000 |
|---|---|---|
| committer | Tim Watson <watson.timothy@gmail.com> | 2014-01-14 14:15:13 +0000 |
| commit | dc23dce3c64329ab9a00877ba47be12ed0a1d6ff (patch) | |
| tree | 49bb520672144e94cb447e556cecdb2cc57aef14 | |
| parent | c42b9cdcfee2e3cea6d9ae29a864bbfde56d654c (diff) | |
| download | rabbitmq-server-git-dc23dce3c64329ab9a00877ba47be12ed0a1d6ff.tar.gz | |
Refactor / maintain a clean interface to the backing queue
Instead of passing amqqueue records to BQ:start/1, revert to passing
queue names and return a list of queue recovery terms ordered
identically to the given queue names. As a result, we can go back to
keying recovery data off the unique queue directory (base)name and no
longer need to track the queue name in the qi. We now also only need
the queue directory name to lookup recovery terms.
Also update the BQ interface documentation and callbacks/specs.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
5 files changed, 70 insertions, 82 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fefb4907f3..5357492f80 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,13 +195,18 @@ recover() -> on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), - {ok, Queues} = BQ:start(DurableQueues), + + %% We reply on BQ:start/1 returning the recovery terms in the same + %% order as the supplied queue names, so that we can zip them together + %% for further processing in recover_durable_queues. + {ok, OrderedRecoveryTerms} = + BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(Queues). + recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -229,9 +234,9 @@ find_durable_queues() -> node(Pid) == Node])) end). -recover_durable_queues(DurableQueues) -> +recover_durable_queues(QueuesAndRecoveryTerms) -> Qs = [{start_queue_process(node(), Q), Terms} || - {Q, Terms} <- DurableQueues], + {Q, Terms} <- QueuesAndRecoveryTerms], [Q || {Q, Terms} <- Qs, queue_init(Q, Terms) == {new, Q}]. queue_init(#amqqueue{ pid = Pid }, RecoveryTerms) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 9e1ebf4187..11563a4e39 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -41,7 +41,11 @@ %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency %% of those queues, or initialise any other shared resources. --callback start([rabbit_types:amqqueue()]) -> rabbit_types:ok(recovery_terms()). +%% +%% The list of queue recovery terms returned as {ok, Terms} MUST be given +%% in the same order as the list of queue names supplied. +%% +-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). %% Called to tear down any state/resources. NB: Implementations should %% not depend on this function being called on shutdown and instead diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 57f70a7bd1..50cd79eaf1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -159,7 +159,7 @@ %%---------------------------------------------------------------------------- --record(qistate, { dir, qname, segments, journal_handle, dirty_count, +-record(qistate, { dir, segments, journal_handle, dirty_count, max_journal_entries, on_sync, unconfirmed }). -record(segment, { num, path, journal_entries, unacked }). @@ -219,8 +219,7 @@ -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: ([rabbit_types:amqqueue()]) -> - {[{file:filename(), [any()]}], {walker(A), A}}). +-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). -spec(add_queue_ttl/0 :: () -> 'ok'). @@ -254,9 +253,9 @@ recover(Name, {Recovery, Terms}, MsgStoreRecovered, false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State = #qistate { qname = QueueName }) -> +terminate(Terms, State = #qistate { dir = Dir }) -> {SegmentCounts, State1} = terminate(State), - rabbit_recovery_terms:store(QueueName, [{segments, SegmentCounts} | Terms]), + rabbit_recovery_terms:store(Dir, [{segments, SegmentCounts} | Terms]), State1. delete_and_terminate(State) -> @@ -352,40 +351,33 @@ bounds(State = #qistate { segments = Segments }) -> end, {LowSeqId, NextSeqId, State}. -recover(DurableQueues) -> +recover(DurableQueueNames) -> rabbit_recovery_terms:recover(), - DurableDict = - dict:from_list( - [ begin - #amqqueue{name = QueueName} = Queue, - DirName = queue_name_to_dir_name(QueueName), - {DirName, Queue} - end || Queue <- DurableQueues ]), - - {DurableQueueNames, DurableTerms} = - dict:fold( - fun (QueueDirName, Queue=#amqqueue{name = QName}, - {DurableAcc, TermsAcc}) -> - RecoveryInfo = - case rabbit_recovery_terms:read(QName, QueueDirName) of - {error, _} -> {Queue, non_clean_shutdown}; - {ok, Terms} -> {Queue, Terms} - end, - {[QName | DurableAcc], [RecoveryInfo | TermsAcc]} - end, {[], []}, DurableDict), + {DurableTerms, DurableDirectories} = + lists:foldl( + fun(QName, {RecoveryTerms, ValidDirectories}) -> + DirName = queue_name_to_dir_name(QName), + RecoveryInfo = case rabbit_recovery_terms:read(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + {[RecoveryInfo | RecoveryTerms], + sets:add_element(DirName, ValidDirectories)} + end, {[], sets:new()}, DurableQueueNames), %% Any queue directory we've not been asked to recover is considered garbage QueuesDir = queues_dir(), - lists:foreach( - fun(QueueDir) -> - case dict:is_key(filename:basename(QueueDir), DurableDict) of - true -> ok; - false -> ok = rabbit_file:recursive_delete([QueueDir]) - end - end, all_queue_directory_names(QueuesDir)), + [rabbit_file:recursive_delete([QueueDir]) || + QueueDir <- all_queue_directory_names(QueuesDir), + not sets:is_element(filename:basename(QueueDir), + DurableDirectories)], rabbit_recovery_terms:clear(), - {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. all_queue_directory_names(Dir) -> case rabbit_file:list_dir(Dir) of @@ -401,15 +393,13 @@ all_queue_directory_names(Dir) -> blank_state(QueueName) -> blank_state_dir( - QueueName, filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). -blank_state_dir(QueueName, Dir) -> +blank_state_dir(Dir) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, - qname = QueueName, segments = segments_new(), journal_handle = undefined, dirty_count = 0, diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index 16a9d032c9..9ff8222d86 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -22,7 +22,7 @@ -behaviour(gen_server). -export([recover/0, upgrade_recovery_terms/0, start_link/0, - store/2, read/2, lookup/2, clear/0, flush/0]). + store/2, read/1, clear/0, flush/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -35,10 +35,7 @@ -spec(upgrade_recovery_terms() -> 'ok'). -spec(start_link() -> rabbit_types:ok_pid_or_error()). -spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())). --spec(read(rabbit_amqqueue:name(), file:filename()) -> - rabbit_types:ok_or_error(not_found)). --spec(lookup(file:filename(), - [{file:filename(), [term()]}]) -> {'ok', [term()]} | 'false'). +-spec(read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found)). -spec(clear() -> 'ok'). -endif. % use_specs @@ -58,50 +55,44 @@ recover() -> end. upgrade_recovery_terms() -> - create_tables(), - QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), - DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true, - fun(F, Acc) -> [F|Acc] end, []), - [begin - {ok, Terms} = rabbit_file:read_term_file(File), - ok = ets:insert_new(?UPGRADE_TABLE, {filename:dirname(File), Terms}), - case file:delete(File) of - {error, E} -> - rabbit_log:warning("Unable to delete recovery index" - "~s during upgrade: ~p~n", [File, E]); - ok -> - ok - end - end || File <- DotFiles], - ok. + create_table(), + try + QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), + DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true, + fun(F, Acc) -> [F|Acc] end, []), + [begin + {ok, Terms} = rabbit_file:read_term_file(File), + ok = store(filename:dirname(File), Terms), + case file:delete(File) of + {error, E} -> + rabbit_log:warning("Unable to delete recovery index" + "~s during upgrade: ~p~n", [File, E]); + ok -> + ok + end + end || File <- DotFiles], + ok + after + flush() + end. start_link() -> gen_server:start_link(?MODULE, [], []). -store(QueueName, Terms) -> dets:insert(?MODULE, {QueueName, Terms}). +store(QueueDir, Terms) -> dets:insert(?MODULE, {to_key(QueueDir), Terms}). -read(QueueName, QueueDir) -> - case dets:lookup(?MODULE, QueueName) of - [{_, Terms}] -> {ok, Terms}; - _ -> read_from_upgrades(QueueDir) - end. - -read_from_upgrades(QueueDir) -> - case ets:lookup(?UPGRADE_TABLE, QueueDir) of +read(QueueDir) -> + case dets:lookup(?MODULE, QueueDir) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} end. -lookup(QueueName, Terms) -> - lists:keyfind(QueueName, 1, Terms). - clear() -> dets:delete_all_objects(?MODULE), - ets:delete_all_objects(?UPGRADE_TABLE), flush(). init(_) -> process_flag(trap_exit, true), - create_tables(), + create_table(), {ok, undefined}. handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}. @@ -119,13 +110,11 @@ code_change(_OldVsn, State, _Extra) -> flush() -> dets:sync(?MODULE). -create_tables() -> +create_table() -> File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), {ok, _} = dets:open_file(?MODULE, [{file, File}, {ram_file, true}, - {auto_save, infinity}]), - ets:new(?UPGRADE_TABLE, [set, public, named_table]), - ok. + {auto_save, infinity}]). to_key(QueueDir) -> filename:basename(QueueDir). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac0b0f78fd..82326651f4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -395,9 +395,9 @@ start(DurableQueues) -> persistent_refs(Terms) -> lists:foldl(fun persistent_refs/2, [], Terms). -persistent_refs({_, non_clean_shutdown}, Acc) -> +persistent_refs(non_clean_shutdown, Acc) -> Acc; -persistent_refs({_, Terms}, Acc) -> +persistent_refs(Terms, Acc) -> Ref = proplists:get_value(persistent_ref, Terms), case Ref of undefined -> Acc; |
