diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-12-20 18:38:40 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-12-20 18:38:40 +0000 |
| commit | 308a2b5f5603b20906de2bec0d7d80ff99073e86 (patch) | |
| tree | 83510fb6ce02038617df748e16527e89de782d9f | |
| parent | 5f7553c0bd2eac3fc311761bc2b81763f537c1e5 (diff) | |
| download | rabbitmq-server-git-308a2b5f5603b20906de2bec0d7d80ff99073e86.tar.gz | |
Group recovery client refs by vhost
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 27 | ||||
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 27 |
3 files changed, 48 insertions, 22 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl index 65bf2c74e8..3aef5e7428 100644 --- a/src/rabbit_msg_store_vhost_sup.erl +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -8,28 +8,32 @@ %% Internal -export([start_store_for_vhost/4]). -start_link(Name, ClientRefs, StartupFunState) -> +start_link(Name, VhostsClientRefs, StartupFunState) -> supervisor2:start_link({local, Name}, ?MODULE, - [Name, ClientRefs, StartupFunState]). + [Name, VhostsClientRefs, StartupFunState]). -init([Name, ClientRefs, StartupFunState]) -> +init([Name, VhostsClientRefs, StartupFunState]) -> ets:new(Name, [named_table, public]), {ok, {{simple_one_for_one, 1, 1}, [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost, - [Name, ClientRefs, StartupFunState]}, + [Name, VhostsClientRefs, StartupFunState]}, transient, infinity, supervisor, [rabbit_msg_store]}]}}. add_vhost(Name, VHost) -> supervisor2:start_child(Name, [VHost]). -start_store_for_vhost(Name, ClientRefs, StartupFunState, VHost) -> +start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) -> case vhost_store_pid(Name, VHost) of no_pid -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), ok = rabbit_file:ensure_dir(VHostDir), rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), - case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of + VhostRefs = case maps:find(VHost, VhostsClientRefs) of + {ok, Refs} -> Refs; + error -> [] + end, + case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of {ok, Pid} -> ets:insert(Name, {VHost, Pid}), {ok, Pid}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9dcb2eef82..43e0d30a4e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -465,14 +465,25 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 5e256b1381..5a41eb86bf 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -215,14 +215,25 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> |
