summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2014-01-13 15:22:59 +0000
committerTim Watson <watson.timothy@gmail.com>2014-01-13 15:22:59 +0000
commitc42b9cdcfee2e3cea6d9ae29a864bbfde56d654c (patch)
tree00218ef1c3b3842d3c44ef909adc68d17a1454d3
parentf4cea4aea45d19284ccbd1500f003fd52366575b (diff)
downloadrabbitmq-server-git-c42b9cdcfee2e3cea6d9ae29a864bbfde56d654c.tar.gz
Refactor to avoid O(N*2) lookups during queue recovery
We remove knowledge of queue directories from rabbit_amqqueue, opting to key index recovery terms off the amqqueue record name (which is a resource record) instead. Although this simplifies the code somewhat and avoid a potentially costly lookup during queue initialisation, it does require a change to the backing queue API, since we now wish for r_amqqueue:recover/0 to iterate over all the queues (paired with their recovery terms, if any) and this means passing #amqqueue{} records around instead of using a #resource{} and/or directory name as keys. Also see rabbit_recovery_terms:read/1, which has gained an extra parameter, since during upgrades we have no access to #amqqueue{} records and /must/ therefore key any upgraded recovery data on the queue directory (basename) instead. This double keyed lookup is particularly gross since we could look the dirname up ourselves in rabbit_recovery_terms:read/1, but doing so avoids the need to export queue_name_to_dir_name from the qi _and_ calculating the MD5 on the queue’s name twice, since the qi (which is calling into read/2) has already done that anyway.
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_queue_index.erl45
-rw-r--r--src/rabbit_recovery_terms.erl63
-rw-r--r--src/rabbit_variable_queue.erl23
5 files changed, 80 insertions, 81 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4c9b86d4dc..fefb4907f3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
- assert_equivalence/5, queue_name_to_dir_name/1,
+ assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -117,7 +117,6 @@
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table()}]).
--spec(queue_name_to_dir_name/1 :: (rabbit_types:r('queue')) -> string()).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
@@ -196,13 +195,13 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- {ok, Terms} = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+ {ok, Queues} = BQ:start(DurableQueues),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(DurableQueues, Terms).
+ recover_durable_queues(Queues).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -230,18 +229,13 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues, RecoveryTerms) ->
- Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [Q || Q <- Qs, queue_init(Q, RecoveryTerms) == {new, Q}].
+recover_durable_queues(DurableQueues) ->
+ Qs = [{start_queue_process(node(), Q), Terms} ||
+ {Q, Terms} <- DurableQueues],
+ [Q || {Q, Terms} <- Qs, queue_init(Q, Terms) == {new, Q}].
-queue_init(#amqqueue{ pid = Pid, name = Name }, RecoveryTerms) ->
- RecoveryKey = queue_name_to_dir_name(Name),
- QueueRecoveryTerms = case rabbit_recovery_terms:lookup(RecoveryKey,
- RecoveryTerms) of
- {_, Terms} -> Terms;
- false -> non_clean_shutdown
- end,
- gen_server2:call(Pid, {init, {self(), QueueRecoveryTerms}}, infinity).
+queue_init(#amqqueue{ pid = Pid }, RecoveryTerms) ->
+ gen_server2:call(Pid, {init, {self(), RecoveryTerms}}, infinity).
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -528,10 +522,6 @@ notify_policy_changed(#amqqueue{pid = QPid}) ->
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary(Name)),
- rabbit_misc:format("~.36B", [Num]).
-
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
consumers_all(VHostPath) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 3c620d971e..9e1ebf4187 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -41,7 +41,7 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
+-callback start([rabbit_types:amqqueue()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 2c6ca32203..57f70a7bd1 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,8 +21,6 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, recover/1]).
--export([scan/3]).
-
-export([add_queue_ttl/0, avoid_zeroes/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -161,7 +159,7 @@
%%----------------------------------------------------------------------------
--record(qistate, { dir, segments, journal_handle, dirty_count,
+-record(qistate, { dir, qname, segments, journal_handle, dirty_count,
max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -221,15 +219,9 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
+-spec(recover/1 :: ([rabbit_types:amqqueue()]) ->
{[{file:filename(), [any()]}], {walker(A), A}}).
--spec(scan/3 :: (file:filename(),
- fun ((seq_id(), rabbit_types:msg_id(),
- rabbit_types:message_properties(), boolean(),
- ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A),
- A) -> A).
-
-spec(add_queue_ttl/0 :: () -> 'ok').
-endif.
@@ -262,9 +254,9 @@ recover(Name, {Recovery, Terms}, MsgStoreRecovered,
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State = #qistate { dir = Dir }) ->
+terminate(Terms, State = #qistate { qname = QueueName }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_terms:store(Dir, [{segments, SegmentCounts} | Terms]),
+ rabbit_recovery_terms:store(QueueName, [{segments, SegmentCounts} | Terms]),
State1.
delete_and_terminate(State) ->
@@ -365,19 +357,21 @@ recover(DurableQueues) ->
DurableDict =
dict:from_list(
[ begin
- DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue),
+ #amqqueue{name = QueueName} = Queue,
+ DirName = queue_name_to_dir_name(QueueName),
{DirName, Queue}
end || Queue <- DurableQueues ]),
{DurableQueueNames, DurableTerms} =
dict:fold(
- fun (QueueDirName, QueueName, {DurableAcc, TermsAcc}) ->
- TermsAcc1 =
- case rabbit_recovery_terms:read(QueueDirName) of
- {error, _} -> TermsAcc;
- {ok, Terms} -> [{QueueDirName, Terms} | TermsAcc]
+ fun (QueueDirName, Queue=#amqqueue{name = QName},
+ {DurableAcc, TermsAcc}) ->
+ RecoveryInfo =
+ case rabbit_recovery_terms:read(QName, QueueDirName) of
+ {error, _} -> {Queue, non_clean_shutdown};
+ {ok, Terms} -> {Queue, Terms}
end,
- {[QueueName | DurableAcc], TermsAcc1}
+ {[QName | DurableAcc], [RecoveryInfo | TermsAcc]}
end, {[], []}, DurableDict),
%% Any queue directory we've not been asked to recover is considered garbage
@@ -407,13 +401,15 @@ all_queue_directory_names(Dir) ->
blank_state(QueueName) ->
blank_state_dir(
+ QueueName,
filename:join(queues_dir(),
- rabbit_amqqueue:queue_name_to_dir_name(QueueName))).
+ queue_name_to_dir_name(QueueName))).
-blank_state_dir(Dir) ->
+blank_state_dir(QueueName, Dir) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
+ qname = QueueName,
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
@@ -504,6 +500,10 @@ recover_message(false, _, del, RelSeq, Segment) ->
recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
+queue_name_to_dir_name(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
@@ -545,9 +545,6 @@ queue_index_walker_reader(QueueName, Gatherer) ->
end, ok, State),
ok = gatherer:finish(Gatherer).
-scan(Dir, Fun, Acc) ->
- scan_segments(Fun, Acc, blank_state_dir(Dir)).
-
scan_segments(Fun, Acc, State) ->
State1 = #qistate { segments = Segments, dir = Dir } =
recover_journal(State),
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index e80a6be53b..16a9d032c9 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -22,7 +22,7 @@
-behaviour(gen_server).
-export([recover/0, upgrade_recovery_terms/0, start_link/0,
- store/2, read/1, lookup/2, clear/0, flush/0]).
+ store/2, read/2, lookup/2, clear/0, flush/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -35,7 +35,8 @@
-spec(upgrade_recovery_terms() -> 'ok').
-spec(start_link() -> rabbit_types:ok_pid_or_error()).
-spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())).
--spec(read(file:filename()) -> rabbit_types:ok_or_error(not_found)).
+-spec(read(rabbit_amqqueue:name(), file:filename()) ->
+ rabbit_types:ok_or_error(not_found)).
-spec(lookup(file:filename(),
[{file:filename(), [term()]}]) -> {'ok', [term()]} | 'false').
-spec(clear() -> 'ok').
@@ -44,6 +45,7 @@
-include("rabbit.hrl").
-define(SERVER, ?MODULE).
+-define(UPGRADE_TABLE, rabbit_recovery_upgrades).
recover() ->
case supervisor:start_child(rabbit_sup,
@@ -56,33 +58,35 @@ recover() ->
end.
upgrade_recovery_terms() ->
- create_table(),
- try
- QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
- DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true,
- fun(F, Acc) -> [F|Acc] end, []),
- [begin
- {ok, Terms} = rabbit_file:read_term_file(File),
- ok = store(filename:dirname(File), Terms),
- case file:delete(File) of
- {error, E} ->
- rabbit_log:warning("Unable to delete recovery index"
- "~s during upgrade: ~p~n", [File, E]);
- ok ->
- ok
- end
- end || File <- DotFiles],
- ok
- after
- flush()
- end.
+ create_tables(),
+ QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
+ DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true,
+ fun(F, Acc) -> [F|Acc] end, []),
+ [begin
+ {ok, Terms} = rabbit_file:read_term_file(File),
+ ok = ets:insert_new(?UPGRADE_TABLE, {filename:dirname(File), Terms}),
+ case file:delete(File) of
+ {error, E} ->
+ rabbit_log:warning("Unable to delete recovery index"
+ "~s during upgrade: ~p~n", [File, E]);
+ ok ->
+ ok
+ end
+ end || File <- DotFiles],
+ ok.
start_link() -> gen_server:start_link(?MODULE, [], []).
-store(QueueDir, Terms) -> dets:insert(?MODULE, {to_key(QueueDir), Terms}).
+store(QueueName, Terms) -> dets:insert(?MODULE, {QueueName, Terms}).
+
+read(QueueName, QueueDir) ->
+ case dets:lookup(?MODULE, QueueName) of
+ [{_, Terms}] -> {ok, Terms};
+ _ -> read_from_upgrades(QueueDir)
+ end.
-read(QueueDir) ->
- case dets:lookup(?MODULE, to_key(QueueDir)) of
+read_from_upgrades(QueueDir) ->
+ case ets:lookup(?UPGRADE_TABLE, QueueDir) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
@@ -92,11 +96,12 @@ lookup(QueueName, Terms) ->
clear() ->
dets:delete_all_objects(?MODULE),
+ ets:delete_all_objects(?UPGRADE_TABLE),
flush().
init(_) ->
process_flag(trap_exit, true),
- create_table(),
+ create_tables(),
{ok, undefined}.
handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
@@ -114,11 +119,13 @@ code_change(_OldVsn, State, _Extra) ->
flush() -> dets:sync(?MODULE).
-create_table() ->
+create_tables() ->
File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
{ok, _} = dets:open_file(?MODULE, [{file, File},
{ram_file, true},
- {auto_save, infinity}]).
+ {auto_save, infinity}]),
+ ets:new(?UPGRADE_TABLE, [set, public, named_table]),
+ ok.
to_key(QueueDir) -> filename:basename(QueueDir).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b77c1bcb02..ac0b0f78fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -389,15 +389,20 @@
%%----------------------------------------------------------------------------
start(DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
- start_msg_store(
- [Ref || {_, Terms} <- AllTerms,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- StartFunState),
- {ok, AllTerms}.
+ {Terms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
+ start_msg_store(persistent_refs(Terms), StartFunState),
+ {ok, Terms}.
+
+persistent_refs(Terms) -> lists:foldl(fun persistent_refs/2, [], Terms).
+
+persistent_refs({_, non_clean_shutdown}, Acc) ->
+ Acc;
+persistent_refs({_, Terms}, Acc) ->
+ Ref = proplists:get_value(persistent_ref, Terms),
+ case Ref of
+ undefined -> Acc;
+ _ -> [Ref | Acc]
+ end.
stop() -> stop_msg_store().