summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-12-20 18:38:40 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2016-12-20 18:38:40 +0000
commit308a2b5f5603b20906de2bec0d7d80ff99073e86 (patch)
tree83510fb6ce02038617df748e16527e89de782d9f
parent5f7553c0bd2eac3fc311761bc2b81763f537c1e5 (diff)
downloadrabbitmq-server-git-308a2b5f5603b20906de2bec0d7d80ff99073e86.tar.gz
Group recovery client refs by vhost
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl16
-rw-r--r--src/rabbit_variable_queue.erl27
-rw-r--r--test/channel_operation_timeout_test_queue.erl27
3 files changed, 48 insertions, 22 deletions
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
index 65bf2c74e8..3aef5e7428 100644
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -8,28 +8,32 @@
%% Internal
-export([start_store_for_vhost/4]).
-start_link(Name, ClientRefs, StartupFunState) ->
+start_link(Name, VhostsClientRefs, StartupFunState) ->
supervisor2:start_link({local, Name}, ?MODULE,
- [Name, ClientRefs, StartupFunState]).
+ [Name, VhostsClientRefs, StartupFunState]).
-init([Name, ClientRefs, StartupFunState]) ->
+init([Name, VhostsClientRefs, StartupFunState]) ->
ets:new(Name, [named_table, public]),
{ok, {{simple_one_for_one, 1, 1},
[{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost,
- [Name, ClientRefs, StartupFunState]},
+ [Name, VhostsClientRefs, StartupFunState]},
transient, infinity, supervisor, [rabbit_msg_store]}]}}.
add_vhost(Name, VHost) ->
supervisor2:start_child(Name, [VHost]).
-start_store_for_vhost(Name, ClientRefs, StartupFunState, VHost) ->
+start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) ->
case vhost_store_pid(Name, VHost) of
no_pid ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
ok = rabbit_file:ensure_dir(VHostDir),
rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]),
- case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of
+ VhostRefs = case maps:find(VHost, VhostsClientRefs) of
+ {ok, Refs} -> Refs;
+ error -> []
+ end,
+ case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of
{ok, Pid} ->
ets:insert(Name, {VHost, Pid}),
{ok, Pid};
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9dcb2eef82..43e0d30a4e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -465,14 +465,25 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
- start_msg_store(
- [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- StartFunState),
+ %% Group recovery terms by vhost.
+ {[], VhostRefs} = lists:foldl(
+ fun
+ %% We need to skip a queue name
+ (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
+ {QNames, VhostRefs};
+ (Terms, {[QueueName | QNames], VhostRefs}) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {QNames, VhostRefs};
+ Ref ->
+ #resource{virtual_host = VHost} = QueueName,
+ Refs = case maps:find(VHost, VhostRefs) of
+ {ok, Val} -> Val;
+ error -> []
+ end,
+ {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
+ end
+ end),
+ start_msg_store(VhostRefs, StartFunState),
{ok, AllTerms}.
stop() ->
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 5e256b1381..5a41eb86bf 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -215,14 +215,25 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
- start_msg_store(
- [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- StartFunState),
+ %% Group recovery terms by vhost.
+ {[], VhostRefs} = lists:foldl(
+ fun
+ %% We need to skip a queue name
+ (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
+ {QNames, VhostRefs};
+ (Terms, {[QueueName | QNames], VhostRefs}) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {QNames, VhostRefs};
+ Ref ->
+ #resource{virtual_host = VHost} = QueueName,
+ Refs = case maps:find(VHost, VhostRefs) of
+ {ok, Val} -> Val;
+ error -> []
+ end,
+ {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
+ end
+ end),
+ start_msg_store(VhostRefs, StartFunState),
{ok, AllTerms}.
stop() ->