diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2019-02-12 17:26:47 -0500 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2019-02-12 17:26:47 -0500 |
| commit | 0d0f39d8a3e3edc23d0aa9758fd4c15e01c7afda (patch) | |
| tree | ef22d3150c295646a2f098c24582b783047c9859 | |
| parent | b0dfe9352f083607d5d7346bd3962ba6c50cc03a (diff) | |
| download | rabbitmq-server-git-0d0f39d8a3e3edc23d0aa9758fd4c15e01c7afda.tar.gz | |
Recover bindings for all durable queues including failed to recover.
If a queue fails to recover it may still be restarted by the supervisor
and eventually start. After that some bindings may be in rabbit_durable_route
but not rabbit_route. This can cause binding not found errors.
If bindings are recovered for failed queues, the behaviour will be
the same as for the crashed queues. (which is currently broken
but needs to be fixed separately)
Addresses #1873
[#163919158]
| -rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 5 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 3 |
4 files changed, 21 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c9c120df77..f366eb43f5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -108,12 +108,16 @@ warn_file_limit() -> ok end. --spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +-spec recover(rabbit_types:vhost()) -> + {ClassicOk :: [amqqueue:amqqueue()], + ClassicFailed :: [amqqueue:amqqueue()], + Quorum :: [amqqueue:amqqueue()]}. recover(VHost) -> Classic = find_local_durable_classic_queues(VHost), Quorum = find_local_quorum_queues(VHost), - recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum). + {ClassicOk, ClassicFailed} = recover_classic_queues(VHost, Classic), + {ClassicOk, ClassicFailed, rabbit_quorum_queue:recover(Quorum)}. recover_classic_queues(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) -> BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]), case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of {ok, _} -> - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + OkQueues = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)), + OkQueuesNames = [amqqueue:get_name(Q) || Q <- OkQueues], + FailedQueues = [Q || Q <- Queues, + not lists:member(amqqueue:get_name(Q), OkQueuesNames)], + {OkQueues, FailedQueues}; {error, Reason} -> rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), throw({error, Reason}) end. -filter_per_type(Queues) -> - lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues). - filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). @@ -156,12 +161,14 @@ stop(VHost) -> -spec start([amqqueue:amqqueue()]) -> 'ok'. start(Qs) -> - {Classic, _Quorum} = filter_per_type(Qs), %% At this point all recovered queues and their bindings are %% visible to routing, so now it is safe for them to complete %% their initialisation (which may involve interacting with other %% queues). - _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic], + _ = [amqqueue:get_pid(Q) ! {self(), go} + || Q <- Qs, + %% All queues are supposed to be classic here. + amqqueue:is_classic(Q)], ok. mark_local_durable_queues_stopped(VHost) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index d0d464cda3..613623b43f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -286,8 +286,7 @@ reductions(Name) -> 0 end. --spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(), atom()}]. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. recover(Queues) -> [begin diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 9180f9ca0a..d0f028a845 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,10 +53,11 @@ recover(VHost) -> VHostStubFile = filename:join(VHostDir, ".vhost"), ok = rabbit_file:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), - Qs = rabbit_amqqueue:recover(VHost), + {ClassicOk, ClassicFailed, Quorum} = rabbit_amqqueue:recover(VHost), + Qs = ClassicOk ++ ClassicFailed ++ Quorum, QNames = [amqqueue:get_name(Q) || Q <- Qs], ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), - ok = rabbit_amqqueue:start(Qs), + ok = rabbit_amqqueue:start(ClassicOk), %% Start queue mirrors. ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index c3f87cce59..d262e4c513 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -733,7 +733,8 @@ bq_queue_recover1(Config) -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), - rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)), + {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST), + rabbit_amqqueue:start(Recovered), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, |
