summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-02-12 17:26:47 -0500
committerDaniil Fedotov <hairyhum@gmail.com>2019-02-12 17:26:47 -0500
commit0d0f39d8a3e3edc23d0aa9758fd4c15e01c7afda (patch)
treeef22d3150c295646a2f098c24582b783047c9859
parentb0dfe9352f083607d5d7346bd3962ba6c50cc03a (diff)
downloadrabbitmq-server-git-0d0f39d8a3e3edc23d0aa9758fd4c15e01c7afda.tar.gz
Recover bindings for all durable queues including failed to recover.
If a queue fails to recover it may still be restarted by the supervisor and eventually start. After that some bindings may be in rabbit_durable_route but not rabbit_route. This can cause binding not found errors. If bindings are recovered for failed queues, the behaviour will be the same as for the crashed queues. (which is currently broken but needs to be fixed separately) Addresses #1873 [#163919158]
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_quorum_queue.erl3
-rw-r--r--src/rabbit_vhost.erl5
-rw-r--r--test/backing_queue_SUITE.erl3
4 files changed, 21 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c9c120df77..f366eb43f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -108,12 +108,16 @@ warn_file_limit() ->
ok
end.
--spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+-spec recover(rabbit_types:vhost()) ->
+ {ClassicOk :: [amqqueue:amqqueue()],
+ ClassicFailed :: [amqqueue:amqqueue()],
+ Quorum :: [amqqueue:amqqueue()]}.
recover(VHost) ->
Classic = find_local_durable_classic_queues(VHost),
Quorum = find_local_quorum_queues(VHost),
- recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum).
+ {ClassicOk, ClassicFailed} = recover_classic_queues(VHost, Classic),
+ {ClassicOk, ClassicFailed, rabbit_quorum_queue:recover(Quorum)}.
recover_classic_queues(VHost, Queues) ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
{ok, _} ->
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ OkQueues = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)),
+ OkQueuesNames = [amqqueue:get_name(Q) || Q <- OkQueues],
+ FailedQueues = [Q || Q <- Queues,
+ not lists:member(amqqueue:get_name(Q), OkQueuesNames)],
+ {OkQueues, FailedQueues};
{error, Reason} ->
rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
throw({error, Reason})
end.
-filter_per_type(Queues) ->
- lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
-
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
@@ -156,12 +161,14 @@ stop(VHost) ->
-spec start([amqqueue:amqqueue()]) -> 'ok'.
start(Qs) ->
- {Classic, _Quorum} = filter_per_type(Qs),
%% At this point all recovered queues and their bindings are
%% visible to routing, so now it is safe for them to complete
%% their initialisation (which may involve interacting with other
%% queues).
- _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
+ _ = [amqqueue:get_pid(Q) ! {self(), go}
+ || Q <- Qs,
+ %% All queues are supposed to be classic here.
+ amqqueue:is_classic(Q)],
ok.
mark_local_durable_queues_stopped(VHost) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index d0d464cda3..613623b43f 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -286,8 +286,7 @@ reductions(Name) ->
0
end.
--spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
- {'absent', amqqueue:amqqueue(), atom()}].
+-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
recover(Queues) ->
[begin
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 9180f9ca0a..d0f028a845 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -53,10 +53,11 @@ recover(VHost) ->
VHostStubFile = filename:join(VHostDir, ".vhost"),
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
- Qs = rabbit_amqqueue:recover(VHost),
+ {ClassicOk, ClassicFailed, Quorum} = rabbit_amqqueue:recover(VHost),
+ Qs = ClassicOk ++ ClassicFailed ++ Quorum,
QNames = [amqqueue:get_name(Q) || Q <- Qs],
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
- ok = rabbit_amqqueue:start(Qs),
+ ok = rabbit_amqqueue:start(ClassicOk),
%% Start queue mirrors.
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index c3f87cce59..d262e4c513 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -733,7 +733,8 @@ bq_queue_recover1(Config) ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(?VHOST),
- rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
+ {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
+ rabbit_amqqueue:start(Recovered),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,