diff options
| author | Tim Watson <watson.timothy@gmail.com> | 2014-01-13 15:22:59 +0000 |
|---|---|---|
| committer | Tim Watson <watson.timothy@gmail.com> | 2014-01-13 15:22:59 +0000 |
| commit | c42b9cdcfee2e3cea6d9ae29a864bbfde56d654c (patch) | |
| tree | 00218ef1c3b3842d3c44ef909adc68d17a1454d3 | |
| parent | f4cea4aea45d19284ccbd1500f003fd52366575b (diff) | |
| download | rabbitmq-server-git-c42b9cdcfee2e3cea6d9ae29a864bbfde56d654c.tar.gz | |
Refactor to avoid O(N*2) lookups during queue recovery
We remove knowledge of queue directories from rabbit_amqqueue, opting
to key index recovery terms off the amqqueue record name (which is a
resource record) instead. Although this simplifies the code somewhat
and avoid a potentially costly lookup during queue initialisation, it
does require a change to the backing queue API, since we now wish for
r_amqqueue:recover/0 to iterate over all the queues (paired with their
recovery terms, if any) and this means passing #amqqueue{} records
around instead of using a #resource{} and/or directory name as keys.
Also see rabbit_recovery_terms:read/1, which has gained an extra
parameter, since during upgrades we have no access to #amqqueue{}
records and /must/ therefore key any upgraded recovery data on the
queue directory (basename) instead. This double keyed lookup is
particularly gross since we could look the dirname up ourselves in
rabbit_recovery_terms:read/1, but doing so avoids the need to export
queue_name_to_dir_name from the qi _and_ calculating the MD5 on the
queue’s name twice, since the qi (which is calling into read/2) has
already done that anyway.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 23 |
5 files changed, 80 insertions, 81 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4c9b86d4dc..fefb4907f3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, - assert_equivalence/5, queue_name_to_dir_name/1, + assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -117,7 +117,6 @@ (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table()}]). --spec(queue_name_to_dir_name/1 :: (rabbit_types:r('queue')) -> string()). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) @@ -196,13 +195,13 @@ recover() -> on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), - {ok, Terms} = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), + {ok, Queues} = BQ:start(DurableQueues), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(DurableQueues, Terms). + recover_durable_queues(Queues). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -230,18 +229,13 @@ find_durable_queues() -> node(Pid) == Node])) end). -recover_durable_queues(DurableQueues, RecoveryTerms) -> - Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], - [Q || Q <- Qs, queue_init(Q, RecoveryTerms) == {new, Q}]. +recover_durable_queues(DurableQueues) -> + Qs = [{start_queue_process(node(), Q), Terms} || + {Q, Terms} <- DurableQueues], + [Q || {Q, Terms} <- Qs, queue_init(Q, Terms) == {new, Q}]. -queue_init(#amqqueue{ pid = Pid, name = Name }, RecoveryTerms) -> - RecoveryKey = queue_name_to_dir_name(Name), - QueueRecoveryTerms = case rabbit_recovery_terms:lookup(RecoveryKey, - RecoveryTerms) of - {_, Terms} -> Terms; - false -> non_clean_shutdown - end, - gen_server2:call(Pid, {init, {self(), QueueRecoveryTerms}}, infinity). +queue_init(#amqqueue{ pid = Pid }, RecoveryTerms) -> + gen_server2:call(Pid, {init, {self(), RecoveryTerms}}, infinity). declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), @@ -528,10 +522,6 @@ notify_policy_changed(#amqqueue{pid = QPid}) -> consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - <<Num:128>> = erlang:md5(term_to_binary(Name)), - rabbit_misc:format("~.36B", [Num]). - consumer_info_keys() -> ?CONSUMER_INFO_KEYS. consumers_all(VHostPath) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 3c620d971e..9e1ebf4187 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -41,7 +41,7 @@ %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency %% of those queues, or initialise any other shared resources. --callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). +-callback start([rabbit_types:amqqueue()]) -> rabbit_types:ok(recovery_terms()). %% Called to tear down any state/resources. NB: Implementations should %% not depend on this function being called on shutdown and instead diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 2c6ca32203..57f70a7bd1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,8 +21,6 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). --export([scan/3]). - -export([add_queue_ttl/0, avoid_zeroes/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -161,7 +159,7 @@ %%---------------------------------------------------------------------------- --record(qistate, { dir, segments, journal_handle, dirty_count, +-record(qistate, { dir, qname, segments, journal_handle, dirty_count, max_journal_entries, on_sync, unconfirmed }). -record(segment, { num, path, journal_entries, unacked }). @@ -221,15 +219,9 @@ -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: ([rabbit_amqqueue:name()]) -> +-spec(recover/1 :: ([rabbit_types:amqqueue()]) -> {[{file:filename(), [any()]}], {walker(A), A}}). --spec(scan/3 :: (file:filename(), - fun ((seq_id(), rabbit_types:msg_id(), - rabbit_types:message_properties(), boolean(), - ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A), - A) -> A). - -spec(add_queue_ttl/0 :: () -> 'ok'). -endif. @@ -262,9 +254,9 @@ recover(Name, {Recovery, Terms}, MsgStoreRecovered, false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State = #qistate { dir = Dir }) -> +terminate(Terms, State = #qistate { qname = QueueName }) -> {SegmentCounts, State1} = terminate(State), - rabbit_recovery_terms:store(Dir, [{segments, SegmentCounts} | Terms]), + rabbit_recovery_terms:store(QueueName, [{segments, SegmentCounts} | Terms]), State1. delete_and_terminate(State) -> @@ -365,19 +357,21 @@ recover(DurableQueues) -> DurableDict = dict:from_list( [ begin - DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue), + #amqqueue{name = QueueName} = Queue, + DirName = queue_name_to_dir_name(QueueName), {DirName, Queue} end || Queue <- DurableQueues ]), {DurableQueueNames, DurableTerms} = dict:fold( - fun (QueueDirName, QueueName, {DurableAcc, TermsAcc}) -> - TermsAcc1 = - case rabbit_recovery_terms:read(QueueDirName) of - {error, _} -> TermsAcc; - {ok, Terms} -> [{QueueDirName, Terms} | TermsAcc] + fun (QueueDirName, Queue=#amqqueue{name = QName}, + {DurableAcc, TermsAcc}) -> + RecoveryInfo = + case rabbit_recovery_terms:read(QName, QueueDirName) of + {error, _} -> {Queue, non_clean_shutdown}; + {ok, Terms} -> {Queue, Terms} end, - {[QueueName | DurableAcc], TermsAcc1} + {[QName | DurableAcc], [RecoveryInfo | TermsAcc]} end, {[], []}, DurableDict), %% Any queue directory we've not been asked to recover is considered garbage @@ -407,13 +401,15 @@ all_queue_directory_names(Dir) -> blank_state(QueueName) -> blank_state_dir( + QueueName, filename:join(queues_dir(), - rabbit_amqqueue:queue_name_to_dir_name(QueueName))). + queue_name_to_dir_name(QueueName))). -blank_state_dir(Dir) -> +blank_state_dir(QueueName, Dir) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, + qname = QueueName, segments = segments_new(), journal_handle = undefined, dirty_count = 0, @@ -504,6 +500,10 @@ recover_message(false, _, del, RelSeq, Segment) -> recover_message(false, _, no_del, RelSeq, Segment) -> add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + <<Num:128>> = erlang:md5(term_to_binary(Name)), + rabbit_misc:format("~.36B", [Num]). + queues_dir() -> filename:join(rabbit_mnesia:dir(), "queues"). @@ -545,9 +545,6 @@ queue_index_walker_reader(QueueName, Gatherer) -> end, ok, State), ok = gatherer:finish(Gatherer). -scan(Dir, Fun, Acc) -> - scan_segments(Fun, Acc, blank_state_dir(Dir)). - scan_segments(Fun, Acc, State) -> State1 = #qistate { segments = Segments, dir = Dir } = recover_journal(State), diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index e80a6be53b..16a9d032c9 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -22,7 +22,7 @@ -behaviour(gen_server). -export([recover/0, upgrade_recovery_terms/0, start_link/0, - store/2, read/1, lookup/2, clear/0, flush/0]). + store/2, read/2, lookup/2, clear/0, flush/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -35,7 +35,8 @@ -spec(upgrade_recovery_terms() -> 'ok'). -spec(start_link() -> rabbit_types:ok_pid_or_error()). -spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())). --spec(read(file:filename()) -> rabbit_types:ok_or_error(not_found)). +-spec(read(rabbit_amqqueue:name(), file:filename()) -> + rabbit_types:ok_or_error(not_found)). -spec(lookup(file:filename(), [{file:filename(), [term()]}]) -> {'ok', [term()]} | 'false'). -spec(clear() -> 'ok'). @@ -44,6 +45,7 @@ -include("rabbit.hrl"). -define(SERVER, ?MODULE). +-define(UPGRADE_TABLE, rabbit_recovery_upgrades). recover() -> case supervisor:start_child(rabbit_sup, @@ -56,33 +58,35 @@ recover() -> end. upgrade_recovery_terms() -> - create_table(), - try - QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), - DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true, - fun(F, Acc) -> [F|Acc] end, []), - [begin - {ok, Terms} = rabbit_file:read_term_file(File), - ok = store(filename:dirname(File), Terms), - case file:delete(File) of - {error, E} -> - rabbit_log:warning("Unable to delete recovery index" - "~s during upgrade: ~p~n", [File, E]); - ok -> - ok - end - end || File <- DotFiles], - ok - after - flush() - end. + create_tables(), + QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), + DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true, + fun(F, Acc) -> [F|Acc] end, []), + [begin + {ok, Terms} = rabbit_file:read_term_file(File), + ok = ets:insert_new(?UPGRADE_TABLE, {filename:dirname(File), Terms}), + case file:delete(File) of + {error, E} -> + rabbit_log:warning("Unable to delete recovery index" + "~s during upgrade: ~p~n", [File, E]); + ok -> + ok + end + end || File <- DotFiles], + ok. start_link() -> gen_server:start_link(?MODULE, [], []). -store(QueueDir, Terms) -> dets:insert(?MODULE, {to_key(QueueDir), Terms}). +store(QueueName, Terms) -> dets:insert(?MODULE, {QueueName, Terms}). + +read(QueueName, QueueDir) -> + case dets:lookup(?MODULE, QueueName) of + [{_, Terms}] -> {ok, Terms}; + _ -> read_from_upgrades(QueueDir) + end. -read(QueueDir) -> - case dets:lookup(?MODULE, to_key(QueueDir)) of +read_from_upgrades(QueueDir) -> + case ets:lookup(?UPGRADE_TABLE, QueueDir) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} end. @@ -92,11 +96,12 @@ lookup(QueueName, Terms) -> clear() -> dets:delete_all_objects(?MODULE), + ets:delete_all_objects(?UPGRADE_TABLE), flush(). init(_) -> process_flag(trap_exit, true), - create_table(), + create_tables(), {ok, undefined}. handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}. @@ -114,11 +119,13 @@ code_change(_OldVsn, State, _Extra) -> flush() -> dets:sync(?MODULE). -create_table() -> +create_tables() -> File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), {ok, _} = dets:open_file(?MODULE, [{file, File}, {ram_file, true}, - {auto_save, infinity}]). + {auto_save, infinity}]), + ets:new(?UPGRADE_TABLE, [set, public, named_table]), + ok. to_key(QueueDir) -> filename:basename(QueueDir). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b77c1bcb02..ac0b0f78fd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -389,15 +389,20 @@ %%---------------------------------------------------------------------------- start(DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), - start_msg_store( - [Ref || {_, Terms} <- AllTerms, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), - {ok, AllTerms}. + {Terms, StartFunState} = rabbit_queue_index:recover(DurableQueues), + start_msg_store(persistent_refs(Terms), StartFunState), + {ok, Terms}. + +persistent_refs(Terms) -> lists:foldl(fun persistent_refs/2, [], Terms). + +persistent_refs({_, non_clean_shutdown}, Acc) -> + Acc; +persistent_refs({_, Terms}, Acc) -> + Ref = proplists:get_value(persistent_ref, Terms), + case Ref of + undefined -> Acc; + _ -> [Ref | Acc] + end. stop() -> stop_msg_store(). |
