diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 7 |
3 files changed, 21 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 337786b571..dbf8693a96 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -108,12 +108,16 @@ warn_file_limit() -> ok end. --spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +-spec recover(rabbit_types:vhost()) -> + {RecoveredClassic :: [amqqueue:amqqueue()], + FailedClassic :: [amqqueue:amqqueue()], + Quorum :: [amqqueue:amqqueue()]}. recover(VHost) -> - Classic = find_local_durable_classic_queues(VHost), + AllClassic = find_local_durable_classic_queues(VHost), Quorum = find_local_quorum_queues(VHost), - recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum). + {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic), + {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}. recover_classic_queues(VHost, Queues) -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), @@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) -> BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]), case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of {ok, _} -> - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)), + RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs], + FailedQueues = [Q || Q <- Queues, + not lists:member(amqqueue:get_name(Q), RecoveredNames)], + {RecoveredQs, FailedQueues}; {error, Reason} -> rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), throw({error, Reason}) end. -filter_per_type(Queues) -> - lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues). - filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). @@ -156,12 +161,14 @@ stop(VHost) -> -spec start([amqqueue:amqqueue()]) -> 'ok'. start(Qs) -> - {Classic, _Quorum} = filter_per_type(Qs), %% At this point all recovered queues and their bindings are %% visible to routing, so now it is safe for them to complete %% their initialisation (which may involve interacting with other %% queues). - _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic], + _ = [amqqueue:get_pid(Q) ! {self(), go} + || Q <- Qs, + %% All queues are supposed to be classic here. + amqqueue:is_classic(Q)], ok. mark_local_durable_queues_stopped(VHost) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 79c07535a4..083acbb2d2 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -286,8 +286,7 @@ reductions(Name) -> 0 end. --spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | - {'absent', amqqueue:amqqueue(), atom()}]. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. recover(Queues) -> [begin diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 9180f9ca0a..1721c9b806 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,10 +53,11 @@ recover(VHost) -> VHostStubFile = filename:join(VHostDir, ".vhost"), ok = rabbit_file:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), - Qs = rabbit_amqqueue:recover(VHost), - QNames = [amqqueue:get_name(Q) || Q <- Qs], + {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost), + AllQs = RecoveredClassic ++ FailedClassic ++ Quorum, + QNames = [amqqueue:get_name(Q) || Q <- AllQs], ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), - ok = rabbit_amqqueue:start(Qs), + ok = rabbit_amqqueue:start(RecoveredClassic), %% Start queue mirrors. ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. |
