diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-17 07:57:44 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-17 07:57:44 +0300 |
| commit | f6d93688a71bb765c91fe2cc3bba6e729d5fae5e (patch) | |
| tree | 4361903e79cffd431387629e8c5cfd900a4d8631 | |
| parent | 111773916b2543541414ec5bd9084b97caaf9e5b (diff) | |
| parent | 391eb24d1f2fd0d2dc381d2e078496bb9d0d2348 (diff) | |
| download | rabbitmq-server-git-f6d93688a71bb765c91fe2cc3bba6e729d5fae5e.tar.gz | |
Merge pull request #1878 from rabbitmq/rabbitmq-server-1873-binding-recovery
Recover bindings for all durable queues including failed to recover.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 7 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 3 |
4 files changed, 23 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 337786b571..dbf8693a96 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -108,12 +108,16 @@ warn_file_limit() -> ok end. --spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +-spec recover(rabbit_types:vhost()) -> + {RecoveredClassic :: [amqqueue:amqqueue()], + FailedClassic :: [amqqueue:amqqueue()], + Quorum :: [amqqueue:amqqueue()]}. recover(VHost) -> - Classic = find_local_durable_classic_queues(VHost), + AllClassic = find_local_durable_classic_queues(VHost), Quorum = find_local_quorum_queues(VHost), - recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum). + {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic), + {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}. recover_classic_queues(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) -> BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]), case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of {ok, _} -> - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)), + RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs], + FailedQueues = [Q || Q <- Queues, + not lists:member(amqqueue:get_name(Q), RecoveredNames)], + {RecoveredQs, FailedQueues}; {error, Reason} -> rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), throw({error, Reason}) end. -filter_per_type(Queues) -> - lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues). - filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). @@ -156,12 +161,14 @@ stop(VHost) -> -spec start([amqqueue:amqqueue()]) -> 'ok'. start(Qs) -> - {Classic, _Quorum} = filter_per_type(Qs), %% At this point all recovered queues and their bindings are %% visible to routing, so now it is safe for them to complete %% their initialisation (which may involve interacting with other %% queues). - _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic], + _ = [amqqueue:get_pid(Q) ! {self(), go} + || Q <- Qs, + %% All queues are supposed to be classic here. + amqqueue:is_classic(Q)], ok. mark_local_durable_queues_stopped(VHost) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 79c07535a4..083acbb2d2 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -286,8 +286,7 @@ reductions(Name) -> 0 end. --spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(), atom()}]. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. recover(Queues) -> [begin diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 9180f9ca0a..1721c9b806 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,10 +53,11 @@ recover(VHost) -> VHostStubFile = filename:join(VHostDir, ".vhost"), ok = rabbit_file:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), - Qs = rabbit_amqqueue:recover(VHost), - QNames = [amqqueue:get_name(Q) || Q <- Qs], + {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost), + AllQs = RecoveredClassic ++ FailedClassic ++ Quorum, + QNames = [amqqueue:get_name(Q) || Q <- AllQs], ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), - ok = rabbit_amqqueue:start(Qs), + ok = rabbit_amqqueue:start(RecoveredClassic), %% Start queue mirrors. ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index c3f87cce59..d262e4c513 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -733,7 +733,8 @@ bq_queue_recover1(Config) -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), - rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)), + {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST), + rabbit_amqqueue:start(Recovered), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, |
