diff options
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 27 | ||||
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 27 |
3 files changed, 48 insertions, 22 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl index 65bf2c74e8..3aef5e7428 100644 --- a/src/rabbit_msg_store_vhost_sup.erl +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -8,28 +8,32 @@ %% Internal -export([start_store_for_vhost/4]). -start_link(Name, ClientRefs, StartupFunState) -> +start_link(Name, VhostsClientRefs, StartupFunState) -> supervisor2:start_link({local, Name}, ?MODULE, - [Name, ClientRefs, StartupFunState]). + [Name, VhostsClientRefs, StartupFunState]). -init([Name, ClientRefs, StartupFunState]) -> +init([Name, VhostsClientRefs, StartupFunState]) -> ets:new(Name, [named_table, public]), {ok, {{simple_one_for_one, 1, 1}, [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost, - [Name, ClientRefs, StartupFunState]}, + [Name, VhostsClientRefs, StartupFunState]}, transient, infinity, supervisor, [rabbit_msg_store]}]}}. add_vhost(Name, VHost) -> supervisor2:start_child(Name, [VHost]). -start_store_for_vhost(Name, ClientRefs, StartupFunState, VHost) -> +start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) -> case vhost_store_pid(Name, VHost) of no_pid -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), ok = rabbit_file:ensure_dir(VHostDir), rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), - case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of + VhostRefs = case maps:find(VHost, VhostsClientRefs) of + {ok, Refs} -> Refs; + error -> [] + end, + case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of {ok, Pid} -> ets:insert(Name, {VHost, Pid}), {ok, Pid}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9dcb2eef82..43e0d30a4e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -465,14 +465,25 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 5e256b1381..5a41eb86bf 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -215,14 +215,25 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> |
