summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-05-25 21:26:34 +0100
committerGitHub <noreply@github.com>2018-05-25 21:26:34 +0100
commit0becb6bf46facd60705c4697e0f59ef71d0d0f1a (patch)
tree2049c552dbf0eff5506a95b31baeb0b68f41f642
parentdd186dbf5e8cfa9b6b3efe4902fb27a07463b751 (diff)
parentf7fc5b1f28d2bf6f02d20a20042e4d75e5b2247f (diff)
downloadrabbitmq-server-git-0becb6bf46facd60705c4697e0f59ef71d0d0f1a.tar.gz
Merge pull request #1608 from rabbitmq/rabbitmq-management-575
Introduce rabbit_vhost:await_running_on_all_nodes/2
-rw-r--r--src/rabbit_vhost.erl57
1 files changed, 42 insertions, 15 deletions
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 73c05389be..7360f8bc95 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -22,7 +22,7 @@
-export([recover/0, recover/1]).
-export([add/2, delete/2, exists/1, list/0, with/2, with_user_and_vhost/3, assert/1, update/2,
- set_limits/2, limits_of/1]).
+ set_limits/2, limits_of/1, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
@@ -93,7 +93,7 @@ add(VHostPath, ActingUser) ->
fun (ok, true) ->
ok;
(ok, false) ->
- [rabbit_exchange:declare(
+ [_ = rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, Internal, [], ActingUser) ||
{Name, Type, Internal} <-
@@ -148,6 +148,45 @@ delete(VHostPath, ActingUser) ->
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.
+%% 50 ms
+-define(AWAIT_SAMPLE_INTERVAL, 50).
+
+-spec await_running_on_all_nodes(rabbit_types:vhost(), integer()) -> ok | {error, timeout}.
+await_running_on_all_nodes(VHost, Timeout) ->
+ Attempts = ceil(Timeout / ?AWAIT_SAMPLE_INTERVAL),
+ await_running_on_all_nodes0(VHost, Attempts).
+
+await_running_on_all_nodes0(_VHost, 0) ->
+ {error, timeout};
+await_running_on_all_nodes0(VHost, Attempts) ->
+ case is_running_on_all_nodes(VHost) of
+ true -> ok;
+ _ ->
+ timer:sleep(?AWAIT_SAMPLE_INTERVAL),
+ await_running_on_all_nodes0(VHost, Attempts - 1)
+ end.
+
+-spec is_running_on_all_nodes(rabbit_types:vhost()) -> boolean().
+is_running_on_all_nodes(VHost) ->
+ States = vhost_cluster_state(VHost),
+ lists:all(fun ({_Node, State}) -> State =:= running end,
+ States).
+
+-spec vhost_cluster_state(rabbit_types:vhost()) -> [{atom(), atom()}].
+vhost_cluster_state(VHost) ->
+ Nodes = rabbit_nodes:all_running(),
+ lists:map(fun(Node) ->
+ State = case rabbit_misc:rpc_call(Node,
+ rabbit_vhost_sup_sup, is_vhost_alive,
+ [VHost]) of
+ {badrpc, nodedown} -> nodedown;
+ true -> running;
+ false -> stopped
+ end,
+ {Node, State}
+ end,
+ Nodes).
+
vhost_down(VHostPath) ->
ok = rabbit_event:notify(vhost_down,
[{name, VHostPath},
@@ -263,19 +302,7 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
i(tracing, VHost) -> rabbit_trace:enabled(VHost);
-i(cluster_state, VHost) ->
- Nodes = rabbit_nodes:all_running(),
- lists:map(fun(Node) ->
- State = case rabbit_misc:rpc_call(Node,
- rabbit_vhost_sup_sup, is_vhost_alive,
- [VHost]) of
- {badrpc, nodedown} -> nodedown;
- true -> running;
- false -> stopped
- end,
- {Node, State}
- end,
- Nodes);
+i(cluster_state, VHost) -> vhost_cluster_state(VHost);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).