summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2013-12-19 10:19:44 +0000
committerTim Watson <watson.timothy@gmail.com>2013-12-19 10:19:44 +0000
commit156fda04395ae9e853205cbe5f5d3ffa9c186709 (patch)
tree3e99d55dbb7558955dbc5d65989ca316bb28d679 /src
parent45817870930aff31097fce5c9801e50199c0faa0 (diff)
downloadrabbitmq-server-git-156fda04395ae9e853205cbe5f5d3ffa9c186709.tar.gz
Rework/Refactor to handle recovery terms up-front
We process all the recovery terms up-front, during qi recovery, and clear + sync the dets table immediately afterwards. The recovery terms and keys, based on the queue directory’s ‘basename’, are then passed throughout the initialisation process and checked in the various places they’re used.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_queue_index.erl44
-rw-r--r--src/rabbit_recovery_terms.erl43
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl27
10 files changed, 105 insertions, 71 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c0010d62ed..0203b4e94c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -595,12 +595,11 @@ boot_delegate() ->
recover() ->
rabbit_policy:recover(),
- ok = rabbit_recovery_indexes:recover(),
+ ok = rabbit_recovery_terms:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
- rabbit_amqqueue:start(Qs),
- ok = rabbit_recovery_indexes:flush().
+ rabbit_amqqueue:start(Qs).
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b1e00b7c9..f611573bf2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
- assert_equivalence/5,
+ assert_equivalence/5, queue_name_to_dir_name/1,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -117,6 +117,7 @@
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table()}]).
+-spec(queue_name_to_dir_name/1 :: (rabbit_types:amqqueue()) -> string()).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
@@ -195,13 +196,13 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+ {ok, Terms} = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(DurableQueues).
+ recover_durable_queues(DurableQueues, Terms).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -229,10 +230,17 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues) ->
+recover_durable_queues(DurableQueues, RecoveryTerms) ->
Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [Q || Q = #amqqueue{pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
+ [Q || Q <- Qs, queue_init(Q, RecoveryTerms) == {new, Q}].
+
+queue_init(#amqqueue{ pid = Pid, name = Name }, RecoveryTerms) ->
+ RecoveryKey = queue_name_to_dir_name(Name),
+ QueueRecoveryTerms = case lists:keyfind(RecoveryKey, 1, RecoveryTerms) of
+ {_, Terms} -> Terms;
+ false -> non_clean_shutdown
+ end,
+ gen_server2:call(Pid, {init, {self(), QueueRecoveryTerms}}, infinity).
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -519,6 +527,10 @@ notify_policy_changed(#amqqueue{pid = QPid}) ->
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
+queue_name_to_dir_name(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
consumers_all(VHostPath) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7002fd367c..3af2993e97 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -187,12 +187,14 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From, State = #q{q = Q,
- backing_queue = undefined,
- backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+declare(Recover, From,
+ State = #q{q = Q,
+ backing_queue = undefined,
+ backing_queue_state = undefined}) ->
+ {IsRecovering, MediatorPid} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, IsRecovering) of
#amqqueue{} = Q1 ->
- case matches(Recover, Q, Q1) of
+ case matches(IsRecovering, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
@@ -202,7 +204,7 @@ declare(Recover, From, State = #q{q = Q,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
- recovery_barrier(Recover),
+ recovery_barrier(MediatorPid),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -219,6 +221,11 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
+recovery_status(new) ->
+ {false, new};
+recovery_status({Recover, _}) ->
+ {true, Recover}.
+
matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
@@ -254,7 +261,7 @@ decorator_callback(QName, F, A) ->
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover =/= new,
+ BQ:init(Q, Recover,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc29..603c34a93a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -27,7 +27,8 @@
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
('empty' | {rabbit_types:msg_id(), Ack})).
--type(attempt_recovery() :: boolean()).
+-type(recovery_terms() :: [{file:filename(), [term()]}]).
+-type(attempt_recovery() :: {boolean(), recovery_terms()}).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
@@ -40,7 +41,7 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_amqqueue:name()]) -> 'ok'.
+-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e2bc32477d..ddd9a6f237 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -373,7 +373,7 @@ qc_default_exchange() ->
qc_variable_queue_init(Q) ->
{call, ?BQMOD, init,
- [Q, false, function(2, ok)]}.
+ [Q, {false, []}, function(2, ok)]}.
qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 96f89ecc11..b578d1a688 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -114,7 +114,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, false),
+ BQS = bq_init(BQ, Q1, {false, []}),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4349a2f0fb..95cb9d977b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,7 +196,8 @@
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
--type(shutdown_terms() :: [any()]).
+-type(recovery_type() :: 'clean_shutdown' | 'non_clean_shutdown').
+-type(shutdown_terms() :: {recovery_type(), [any()]}).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
@@ -244,19 +245,16 @@ init(Name, OnSyncFun) ->
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case rabbit_recovery_indexes:read_recovery_terms(Dir) of
+ case rabbit_recovery_terms:read(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
end.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+recover(Name, {Recovery, Terms}, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun) ->
+ State = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown =
- case rabbit_recovery_indexes:remove_recovery_terms(Dir) of
- ok -> true;
- {error, not_found} -> false
- end,
+ CleanShutdown = Recovery =/= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
@@ -265,7 +263,7 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
terminate(Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_indexes:store_recovery_terms(
+ rabbit_recovery_terms:store(
Dir, [{segments, SegmentCounts} | Terms]),
State1.
@@ -363,8 +361,12 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
+ DurableDict =
+ dict:from_list(
+ [ begin
+ DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue),
+ {DirName, Queue}
+ end || Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
@@ -375,20 +377,23 @@ recover(DurableQueues) ->
case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case rabbit_recovery_indexes:read_recovery_terms(
+ case rabbit_recovery_terms:read(
QueueDirPath) of
{error, _} -> TermsAcc;
- {ok, Terms} -> [Terms | TermsAcc]
+ {ok, Terms} -> [{QueueDirPath, Terms} |
+ TermsAcc]
end,
{[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
ok = rabbit_file:recursive_delete([QueueDirPath]),
- rabbit_recovery_indexes:remove_recovery_terms(
- QueueDirPath),
+ %rabbit_recovery_indexes:remove_recovery_terms(
+ % QueueDirPath),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
+ rabbit_recovery_terms:clear(),
+ rabbit_recovery_terms:flush(),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
@@ -405,7 +410,8 @@ all_queue_directory_names(Dir) ->
blank_state(QueueName) ->
blank_state_dir(
- filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+ filename:join(queues_dir(),
+ rabbit_amqqueue:queue_name_to_dir_name(QueueName))).
blank_state_dir(Dir) ->
{ok, MaxJournal} =
@@ -501,10 +507,6 @@ recover_message(false, _, del, RelSeq, Segment) ->
recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary(Name)),
- rabbit_misc:format("~.36B", [Num]).
-
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 48af9530ed..f8138e0e4a 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -24,9 +24,9 @@
-export([recover/0,
upgrade_recovery_indexes/0,
start_link/0,
- store_recovery_terms/2,
- read_recovery_terms/1,
- remove_recovery_terms/1,
+ store/2,
+ read/1,
+ clear/0,
flush/0]).
-export([init/1,
@@ -43,15 +43,16 @@
-spec(recover() -> 'ok').
-spec(upgrade_recovery_indexes() -> 'ok').
-spec(start_link() -> rabbit_types:ok_pid_or_error()).
--spec(store_recovery_terms(
+-spec(store(
Name :: file:filename(),
Terms :: term()) -> rabbit_types:ok_or_error(term())).
--spec(read_recovery_terms(
- file:filename()) ->
- rabbit_types:ok_or_error(not_found)).
--spec(remove_recovery_terms(
+-spec(read(
file:filename()) ->
rabbit_types:ok_or_error(not_found)).
+-spec(clear() -> 'ok').
+%-spec(remove_recovery_terms(
+% file:filename()) ->
+% rabbit_types:ok_or_error(not_found)).
-endif. % use_specs
@@ -76,7 +77,7 @@ upgrade_recovery_indexes() ->
fun(F, Acc) -> [F|Acc] end, []),
[begin
{ok, Terms} = rabbit_file:read_term_file(File),
- ok = store_recovery_terms(File, Terms),
+ ok = store(File, Terms),
case file:delete(File) of
{error, E} ->
rabbit_log:warning("Unable to delete recovery index"
@@ -93,20 +94,26 @@ upgrade_recovery_indexes() ->
start_link() ->
gen_server:start_link(?MODULE, [], []).
-store_recovery_terms(Name, Terms) ->
- dets:insert(?MODULE, {Name, Terms}).
+store(Name, Terms) ->
+ dets:insert(?MODULE, {scrub(Name), Terms}).
-read_recovery_terms(Name) ->
- case dets:lookup(?MODULE, Name) of
+read(Name) ->
+ case dets:lookup(?MODULE, scrub(Name)) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
-remove_recovery_terms(Name) ->
- case dets:member(?MODULE, Name) of
- true -> dets:delete(?MODULE, Name);
- _ -> {error, not_found}
- end.
+scrub(Name) ->
+ filename:basename(Name).
+
+%remove_recovery_terms(Name) ->
+% case dets:member(?MODULE, Name) of
+% true -> dets:delete(?MODULE, Name);
+% _ -> {error, not_found}
+% end.
+
+clear() ->
+ dets:delete_all_objects(?MODULE).
flush() ->
dets:sync(?MODULE),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3bf..7aafb23dbc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2129,11 +2129,12 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
+ %% TODO: shutdown_terms is no longer relevant - rework this test case
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
- TestQueue, Terms, false,
+ TestQueue, {clean_shutdown, Terms}, false,
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52d0..1b29ceb357 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -391,12 +391,13 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
start_msg_store(
- [Ref || Terms <- AllTerms,
+ [Ref || {_, Terms} <- AllTerms,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- StartFunState).
+ StartFunState),
+ {ok, AllTerms}.
stop() -> stop_msg_store().
@@ -419,7 +420,7 @@ init(Queue, Recover, AsyncCallback) ->
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
@@ -430,21 +431,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, true,
+init(#amqqueue { name = QueueName, durable = true }, {_, Terms},
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- Terms = rabbit_queue_index:shutdown_terms(QueueName),
- {PRef, Terms1} =
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef1 -> {PRef1, Terms}
- end,
+ %% Terms = rabbit_queue_index:shutdown_terms(QueueName),
+ {PRef, Recovery, Terms1} = process_recovery_terms(Terms),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
- QueueName, Terms1,
+ QueueName, {Recovery, Terms1},
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
@@ -453,6 +450,14 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
+process_recovery_terms(Recovery=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Recovery, []};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), clean_shutdown, []};
+ PRef1 -> {PRef1, clean_shutdown, Terms}
+ end.
+
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,