summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-02-17 07:57:44 +0300
committerGitHub <noreply@github.com>2019-02-17 07:57:44 +0300
commitf6d93688a71bb765c91fe2cc3bba6e729d5fae5e (patch)
tree4361903e79cffd431387629e8c5cfd900a4d8631
parent111773916b2543541414ec5bd9084b97caaf9e5b (diff)
parent391eb24d1f2fd0d2dc381d2e078496bb9d0d2348 (diff)
downloadrabbitmq-server-git-f6d93688a71bb765c91fe2cc3bba6e729d5fae5e.tar.gz
Merge pull request #1878 from rabbitmq/rabbitmq-server-1873-binding-recovery
Recover bindings for all durable queues including failed to recover.
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_quorum_queue.erl3
-rw-r--r--src/rabbit_vhost.erl7
-rw-r--r--test/backing_queue_SUITE.erl3
4 files changed, 23 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 337786b571..dbf8693a96 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -108,12 +108,16 @@ warn_file_limit() ->
ok
end.
--spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+-spec recover(rabbit_types:vhost()) ->
+ {RecoveredClassic :: [amqqueue:amqqueue()],
+ FailedClassic :: [amqqueue:amqqueue()],
+ Quorum :: [amqqueue:amqqueue()]}.
recover(VHost) ->
- Classic = find_local_durable_classic_queues(VHost),
+ AllClassic = find_local_durable_classic_queues(VHost),
Quorum = find_local_quorum_queues(VHost),
- recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum).
+ {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic),
+ {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}.
recover_classic_queues(VHost, Queues) ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
{ok, _} ->
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)),
+ RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs],
+ FailedQueues = [Q || Q <- Queues,
+ not lists:member(amqqueue:get_name(Q), RecoveredNames)],
+ {RecoveredQs, FailedQueues};
{error, Reason} ->
rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
throw({error, Reason})
end.
-filter_per_type(Queues) ->
- lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
-
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
@@ -156,12 +161,14 @@ stop(VHost) ->
-spec start([amqqueue:amqqueue()]) -> 'ok'.
start(Qs) ->
- {Classic, _Quorum} = filter_per_type(Qs),
%% At this point all recovered queues and their bindings are
%% visible to routing, so now it is safe for them to complete
%% their initialisation (which may involve interacting with other
%% queues).
- _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
+ _ = [amqqueue:get_pid(Q) ! {self(), go}
+ || Q <- Qs,
+ %% All queues are supposed to be classic here.
+ amqqueue:is_classic(Q)],
ok.
mark_local_durable_queues_stopped(VHost) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 79c07535a4..083acbb2d2 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -286,8 +286,7 @@ reductions(Name) ->
0
end.
--spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
- {'absent', amqqueue:amqqueue(), atom()}].
+-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
recover(Queues) ->
[begin
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 9180f9ca0a..1721c9b806 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -53,10 +53,11 @@ recover(VHost) ->
VHostStubFile = filename:join(VHostDir, ".vhost"),
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
- Qs = rabbit_amqqueue:recover(VHost),
- QNames = [amqqueue:get_name(Q) || Q <- Qs],
+ {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost),
+ AllQs = RecoveredClassic ++ FailedClassic ++ Quorum,
+ QNames = [amqqueue:get_name(Q) || Q <- AllQs],
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
- ok = rabbit_amqqueue:start(Qs),
+ ok = rabbit_amqqueue:start(RecoveredClassic),
%% Start queue mirrors.
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index c3f87cce59..d262e4c513 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -733,7 +733,8 @@ bq_queue_recover1(Config) ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(?VHOST),
- rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
+ {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
+ rabbit_amqqueue:start(Recovered),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,