summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_queue_index.erl44
-rw-r--r--src/rabbit_recovery_terms.erl43
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl27
10 files changed, 105 insertions, 71 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c0010d62ed..0203b4e94c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -595,12 +595,11 @@ boot_delegate() ->
recover() ->
rabbit_policy:recover(),
- ok = rabbit_recovery_indexes:recover(),
+ ok = rabbit_recovery_terms:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
- rabbit_amqqueue:start(Qs),
- ok = rabbit_recovery_indexes:flush().
+ rabbit_amqqueue:start(Qs).
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b1e00b7c9..f611573bf2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
- assert_equivalence/5,
+ assert_equivalence/5, queue_name_to_dir_name/1,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -117,6 +117,7 @@
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table()}]).
+-spec(queue_name_to_dir_name/1 :: (rabbit_types:amqqueue()) -> string()).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
@@ -195,13 +196,13 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+ {ok, Terms} = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(DurableQueues).
+ recover_durable_queues(DurableQueues, Terms).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -229,10 +230,17 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues) ->
+recover_durable_queues(DurableQueues, RecoveryTerms) ->
Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [Q || Q = #amqqueue{pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
+ [Q || Q <- Qs, queue_init(Q, RecoveryTerms) == {new, Q}].
+
+queue_init(#amqqueue{ pid = Pid, name = Name }, RecoveryTerms) ->
+ RecoveryKey = queue_name_to_dir_name(Name),
+ QueueRecoveryTerms = case lists:keyfind(RecoveryKey, 1, RecoveryTerms) of
+ {_, Terms} -> Terms;
+ false -> non_clean_shutdown
+ end,
+ gen_server2:call(Pid, {init, {self(), QueueRecoveryTerms}}, infinity).
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -519,6 +527,10 @@ notify_policy_changed(#amqqueue{pid = QPid}) ->
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
+queue_name_to_dir_name(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
consumers_all(VHostPath) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7002fd367c..3af2993e97 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -187,12 +187,14 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From, State = #q{q = Q,
- backing_queue = undefined,
- backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+declare(Recover, From,
+ State = #q{q = Q,
+ backing_queue = undefined,
+ backing_queue_state = undefined}) ->
+ {IsRecovering, MediatorPid} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, IsRecovering) of
#amqqueue{} = Q1 ->
- case matches(Recover, Q, Q1) of
+ case matches(IsRecovering, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
@@ -202,7 +204,7 @@ declare(Recover, From, State = #q{q = Q,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
- recovery_barrier(Recover),
+ recovery_barrier(MediatorPid),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -219,6 +221,11 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
+recovery_status(new) ->
+ {false, new};
+recovery_status({Recover, _}) ->
+ {true, Recover}.
+
matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
@@ -254,7 +261,7 @@ decorator_callback(QName, F, A) ->
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover =/= new,
+ BQ:init(Q, Recover,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc29..603c34a93a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -27,7 +27,8 @@
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
('empty' | {rabbit_types:msg_id(), Ack})).
--type(attempt_recovery() :: boolean()).
+-type(recovery_terms() :: [{file:filename(), [term()]}]).
+-type(attempt_recovery() :: {boolean(), recovery_terms()}).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
@@ -40,7 +41,7 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_amqqueue:name()]) -> 'ok'.
+-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e2bc32477d..ddd9a6f237 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -373,7 +373,7 @@ qc_default_exchange() ->
qc_variable_queue_init(Q) ->
{call, ?BQMOD, init,
- [Q, false, function(2, ok)]}.
+ [Q, {false, []}, function(2, ok)]}.
qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 96f89ecc11..b578d1a688 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -114,7 +114,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, false),
+ BQS = bq_init(BQ, Q1, {false, []}),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4349a2f0fb..95cb9d977b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,7 +196,8 @@
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
--type(shutdown_terms() :: [any()]).
+-type(recovery_type() :: 'clean_shutdown' | 'non_clean_shutdown').
+-type(shutdown_terms() :: {recovery_type(), [any()]}).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
@@ -244,19 +245,16 @@ init(Name, OnSyncFun) ->
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case rabbit_recovery_indexes:read_recovery_terms(Dir) of
+ case rabbit_recovery_terms:read(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
end.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+recover(Name, {Recovery, Terms}, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun) ->
+ State = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown =
- case rabbit_recovery_indexes:remove_recovery_terms(Dir) of
- ok -> true;
- {error, not_found} -> false
- end,
+ CleanShutdown = Recovery =/= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
@@ -265,7 +263,7 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
terminate(Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_indexes:store_recovery_terms(
+ rabbit_recovery_terms:store(
Dir, [{segments, SegmentCounts} | Terms]),
State1.
@@ -363,8 +361,12 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
+ DurableDict =
+ dict:from_list(
+ [ begin
+ DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue),
+ {DirName, Queue}
+ end || Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
@@ -375,20 +377,23 @@ recover(DurableQueues) ->
case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case rabbit_recovery_indexes:read_recovery_terms(
+ case rabbit_recovery_terms:read(
QueueDirPath) of
{error, _} -> TermsAcc;
- {ok, Terms} -> [Terms | TermsAcc]
+ {ok, Terms} -> [{QueueDirPath, Terms} |
+ TermsAcc]
end,
{[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
ok = rabbit_file:recursive_delete([QueueDirPath]),
- rabbit_recovery_indexes:remove_recovery_terms(
- QueueDirPath),
+ %rabbit_recovery_indexes:remove_recovery_terms(
+ % QueueDirPath),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
+ rabbit_recovery_terms:clear(),
+ rabbit_recovery_terms:flush(),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
@@ -405,7 +410,8 @@ all_queue_directory_names(Dir) ->
blank_state(QueueName) ->
blank_state_dir(
- filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+ filename:join(queues_dir(),
+ rabbit_amqqueue:queue_name_to_dir_name(QueueName))).
blank_state_dir(Dir) ->
{ok, MaxJournal} =
@@ -501,10 +507,6 @@ recover_message(false, _, del, RelSeq, Segment) ->
recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary(Name)),
- rabbit_misc:format("~.36B", [Num]).
-
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 48af9530ed..f8138e0e4a 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -24,9 +24,9 @@
-export([recover/0,
upgrade_recovery_indexes/0,
start_link/0,
- store_recovery_terms/2,
- read_recovery_terms/1,
- remove_recovery_terms/1,
+ store/2,
+ read/1,
+ clear/0,
flush/0]).
-export([init/1,
@@ -43,15 +43,16 @@
-spec(recover() -> 'ok').
-spec(upgrade_recovery_indexes() -> 'ok').
-spec(start_link() -> rabbit_types:ok_pid_or_error()).
--spec(store_recovery_terms(
+-spec(store(
Name :: file:filename(),
Terms :: term()) -> rabbit_types:ok_or_error(term())).
--spec(read_recovery_terms(
- file:filename()) ->
- rabbit_types:ok_or_error(not_found)).
--spec(remove_recovery_terms(
+-spec(read(
file:filename()) ->
rabbit_types:ok_or_error(not_found)).
+-spec(clear() -> 'ok').
+%-spec(remove_recovery_terms(
+% file:filename()) ->
+% rabbit_types:ok_or_error(not_found)).
-endif. % use_specs
@@ -76,7 +77,7 @@ upgrade_recovery_indexes() ->
fun(F, Acc) -> [F|Acc] end, []),
[begin
{ok, Terms} = rabbit_file:read_term_file(File),
- ok = store_recovery_terms(File, Terms),
+ ok = store(File, Terms),
case file:delete(File) of
{error, E} ->
rabbit_log:warning("Unable to delete recovery index"
@@ -93,20 +94,26 @@ upgrade_recovery_indexes() ->
start_link() ->
gen_server:start_link(?MODULE, [], []).
-store_recovery_terms(Name, Terms) ->
- dets:insert(?MODULE, {Name, Terms}).
+store(Name, Terms) ->
+ dets:insert(?MODULE, {scrub(Name), Terms}).
-read_recovery_terms(Name) ->
- case dets:lookup(?MODULE, Name) of
+read(Name) ->
+ case dets:lookup(?MODULE, scrub(Name)) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
-remove_recovery_terms(Name) ->
- case dets:member(?MODULE, Name) of
- true -> dets:delete(?MODULE, Name);
- _ -> {error, not_found}
- end.
+scrub(Name) ->
+ filename:basename(Name).
+
+%remove_recovery_terms(Name) ->
+% case dets:member(?MODULE, Name) of
+% true -> dets:delete(?MODULE, Name);
+% _ -> {error, not_found}
+% end.
+
+clear() ->
+ dets:delete_all_objects(?MODULE).
flush() ->
dets:sync(?MODULE),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3bf..7aafb23dbc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2129,11 +2129,12 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
+ %% TODO: shutdown_terms is no longer relevant - rework this test case
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
- TestQueue, Terms, false,
+ TestQueue, {clean_shutdown, Terms}, false,
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52d0..1b29ceb357 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -391,12 +391,13 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
start_msg_store(
- [Ref || Terms <- AllTerms,
+ [Ref || {_, Terms} <- AllTerms,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- StartFunState).
+ StartFunState),
+ {ok, AllTerms}.
stop() -> stop_msg_store().
@@ -419,7 +420,7 @@ init(Queue, Recover, AsyncCallback) ->
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
@@ -430,21 +431,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, true,
+init(#amqqueue { name = QueueName, durable = true }, {_, Terms},
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- Terms = rabbit_queue_index:shutdown_terms(QueueName),
- {PRef, Terms1} =
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef1 -> {PRef1, Terms}
- end,
+ %% Terms = rabbit_queue_index:shutdown_terms(QueueName),
+ {PRef, Recovery, Terms1} = process_recovery_terms(Terms),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
- QueueName, Terms1,
+ QueueName, {Recovery, Terms1},
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
@@ -453,6 +450,14 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
+process_recovery_terms(Recovery=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Recovery, []};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), clean_shutdown, []};
+ PRef1 -> {PRef1, clean_shutdown, Terms}
+ end.
+
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,