diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2018-05-24 07:51:06 +0100 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2018-05-24 07:52:32 +0100 |
| commit | f7fc5b1f28d2bf6f02d20a20042e4d75e5b2247f (patch) | |
| tree | de586679e6b37bc3655ea3f86fdae158b1d221eb | |
| parent | e73d8caa01a2c2636d6bc03f61babdf0eaba5396 (diff) | |
| download | rabbitmq-server-git-f7fc5b1f28d2bf6f02d20a20042e4d75e5b2247f.tar.gz | |
Introduce rabbit_vhost:await_running_on_all_nodes/2
Part of rabbitmq/rabbitmq-management#575.
[#157817330]
| -rw-r--r-- | src/rabbit_vhost.erl | 57 |
1 files changed, 42 insertions, 15 deletions
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 73c05389be..7360f8bc95 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -22,7 +22,7 @@ -export([recover/0, recover/1]). -export([add/2, delete/2, exists/1, list/0, with/2, with_user_and_vhost/3, assert/1, update/2, - set_limits/2, limits_of/1]). + set_limits/2, limits_of/1, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). @@ -93,7 +93,7 @@ add(VHostPath, ActingUser) -> fun (ok, true) -> ok; (ok, false) -> - [rabbit_exchange:declare( + [_ = rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, Internal, [], ActingUser) || {Name, Type, Internal} <- @@ -148,6 +148,45 @@ delete(VHostPath, ActingUser) -> rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. +%% 50 ms +-define(AWAIT_SAMPLE_INTERVAL, 50). + +-spec await_running_on_all_nodes(rabbit_types:vhost(), integer()) -> ok | {error, timeout}. +await_running_on_all_nodes(VHost, Timeout) -> + Attempts = ceil(Timeout / ?AWAIT_SAMPLE_INTERVAL), + await_running_on_all_nodes0(VHost, Attempts). + +await_running_on_all_nodes0(_VHost, 0) -> + {error, timeout}; +await_running_on_all_nodes0(VHost, Attempts) -> + case is_running_on_all_nodes(VHost) of + true -> ok; + _ -> + timer:sleep(?AWAIT_SAMPLE_INTERVAL), + await_running_on_all_nodes0(VHost, Attempts - 1) + end. + +-spec is_running_on_all_nodes(rabbit_types:vhost()) -> boolean(). +is_running_on_all_nodes(VHost) -> + States = vhost_cluster_state(VHost), + lists:all(fun ({_Node, State}) -> State =:= running end, + States). + +-spec vhost_cluster_state(rabbit_types:vhost()) -> [{atom(), atom()}]. +vhost_cluster_state(VHost) -> + Nodes = rabbit_nodes:all_running(), + lists:map(fun(Node) -> + State = case rabbit_misc:rpc_call(Node, + rabbit_vhost_sup_sup, is_vhost_alive, + [VHost]) of + {badrpc, nodedown} -> nodedown; + true -> running; + false -> stopped + end, + {Node, State} + end, + Nodes). + vhost_down(VHostPath) -> ok = rabbit_event:notify(vhost_down, [{name, VHostPath}, @@ -263,19 +302,7 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, VHost) -> VHost; i(tracing, VHost) -> rabbit_trace:enabled(VHost); -i(cluster_state, VHost) -> - Nodes = rabbit_nodes:all_running(), - lists:map(fun(Node) -> - State = case rabbit_misc:rpc_call(Node, - rabbit_vhost_sup_sup, is_vhost_alive, - [VHost]) of - {badrpc, nodedown} -> nodedown; - true -> running; - false -> stopped - end, - {Node, State} - end, - Nodes); +i(cluster_state, VHost) -> vhost_cluster_state(VHost); i(Item, _) -> throw({bad_argument, Item}). info(VHost) -> infos(?INFO_KEYS, VHost). |
