diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-10-10 15:28:49 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-10-10 15:28:49 +0100 |
| commit | d8a5d27d601cbb008a244ebaec490d538b174189 (patch) | |
| tree | ab948184f690adeee6a274ea322ee9768d0170df /src | |
| parent | 26f19a76d71565ff1f46567e0abb77e54d0a4879 (diff) | |
| download | rabbitmq-server-git-d8a5d27d601cbb008a244ebaec490d538b174189.tar.gz | |
Separate out different is_process_alive implementations depending on whether we want to consider the process alive if it is running but we can't talk to it via Mnesia. Thus unbreak exclusive queues with the direct client from non-Rabbit nodes.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_prequeue.erl | 4 |
5 files changed, 23 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c4abfd9d61..68e967425f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -398,7 +398,7 @@ with(Name, F, E) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_misc:is_process_alive(QPid), + fun () -> false = rabbit_mnesia:is_process_alive(QPid), timer:sleep(25), with(Name, F, E) end, fun () -> F(Q) end); @@ -772,7 +772,7 @@ on_node_down(Node) -> slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node andalso - not rabbit_misc:is_process_alive(Pid)])), + not rabbit_mnesia:is_process_alive(Pid)])), {Qs, Dels} = lists:unzip(QsDels), T = rabbit_binding:process_deletions( lists:foldl(fun rabbit_binding:combine_deletions/2, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index af1e21414c..58fbcbe074 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -167,11 +167,11 @@ init_it(Self, GM, Node, QName) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> add_slave(Q, Self, GM), {new, QPid, GMPids}; - [QPid] -> case rabbit_misc:is_process_alive(QPid) of + [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} end; - [SPid] -> case rabbit_misc:is_process_alive(SPid) of + [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of true -> existing; false -> GMPids1 = [T || T = {_, S} <- GMPids, S =/= SPid], diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index dd4d5c76ef..2bd81a86eb 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -852,9 +852,14 @@ ntoab(IP) -> %% We try to avoid reconnecting to down nodes here; this is used in a %% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur %% would be bad news. +%% +%% See also rabbit_mnesia:is_process_alive/1 which also requires the +%% process be in the same running cluster as us (i.e. not partitioned +%% or some random node). is_process_alive(Pid) -> - rabbit_mnesia:on_running_node(Pid) andalso - rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true. + Node = node(Pid), + lists:member(Node, [node() | nodes()]) andalso + rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true. pget(K, P) -> proplists:get_value(K, P). pget(K, P, D) -> proplists:get_value(K, P, D). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 19fd01a1a7..fa51dd70de 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -28,6 +28,7 @@ status/0, is_clustered/0, on_running_node/1, + is_process_alive/1, cluster_nodes/1, node_type/0, dir/0, @@ -73,6 +74,7 @@ {'partitions', [{node(), [node()]}]}]). -spec(is_clustered/0 :: () -> boolean()). -spec(on_running_node/1 :: (pid()) -> boolean()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]). -spec(node_type/0 :: () -> node_type()). -spec(dir/0 :: () -> file:filename()). @@ -340,6 +342,14 @@ is_clustered() -> AllNodes = cluster_nodes(all), on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)). +%% This requires the process be in the same running cluster as us +%% (i.e. not partitioned or some random node). +%% +%% See also rabbit_misc:is_process_alive/1 which does not. +is_process_alive(Pid) -> + on_running_node(Pid) andalso + rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true. + cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). %% This function is the actual source of information, since it gets diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index e7c2b5e43a..16e30cac11 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -66,8 +66,8 @@ init(#amqqueue{name = QueueName}, restart) -> slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), LocalOrMasterDown = node(QPid) =:= node() orelse not rabbit_mnesia:on_running_node(QPid), - Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], - case rabbit_misc:is_process_alive(QPid) of + Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], + case rabbit_mnesia:is_process_alive(QPid) of true -> false = LocalOrMasterDown, %% assertion rabbit_mirror_queue_slave:go(self(), async), rabbit_mirror_queue_slave:init(Q); %% [1] |
