summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-10-10 15:28:49 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-10-10 15:28:49 +0100
commitd8a5d27d601cbb008a244ebaec490d538b174189 (patch)
treeab948184f690adeee6a274ea322ee9768d0170df /src
parent26f19a76d71565ff1f46567e0abb77e54d0a4879 (diff)
downloadrabbitmq-server-git-d8a5d27d601cbb008a244ebaec490d538b174189.tar.gz
Separate out different is_process_alive implementations depending on whether we want to consider the process alive if it is running but we can't talk to it via Mnesia. Thus unbreak exclusive queues with the direct client from non-Rabbit nodes.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_mnesia.erl10
-rw-r--r--src/rabbit_prequeue.erl4
5 files changed, 23 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c4abfd9d61..68e967425f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -398,7 +398,7 @@ with(Name, F, E) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_misc:is_process_alive(QPid),
+ fun () -> false = rabbit_mnesia:is_process_alive(QPid),
timer:sleep(25),
with(Name, F, E)
end, fun () -> F(Q) end);
@@ -772,7 +772,7 @@ on_node_down(Node) ->
slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node andalso
- not rabbit_misc:is_process_alive(Pid)])),
+ not rabbit_mnesia:is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index af1e21414c..58fbcbe074 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -167,11 +167,11 @@ init_it(Self, GM, Node, QName) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
{new, QPid, GMPids};
- [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
end;
- [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of
true -> existing;
false -> GMPids1 = [T || T = {_, S} <- GMPids,
S =/= SPid],
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dd4d5c76ef..2bd81a86eb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -852,9 +852,14 @@ ntoab(IP) ->
%% We try to avoid reconnecting to down nodes here; this is used in a
%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
%% would be bad news.
+%%
+%% See also rabbit_mnesia:is_process_alive/1 which also requires the
+%% process be in the same running cluster as us (i.e. not partitioned
+%% or some random node).
is_process_alive(Pid) ->
- rabbit_mnesia:on_running_node(Pid) andalso
- rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+ Node = node(Pid),
+ lists:member(Node, [node() | nodes()]) andalso
+ rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
pget(K, P) -> proplists:get_value(K, P).
pget(K, P, D) -> proplists:get_value(K, P, D).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 19fd01a1a7..fa51dd70de 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -28,6 +28,7 @@
status/0,
is_clustered/0,
on_running_node/1,
+ is_process_alive/1,
cluster_nodes/1,
node_type/0,
dir/0,
@@ -73,6 +74,7 @@
{'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(on_running_node/1 :: (pid()) -> boolean()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
@@ -340,6 +342,14 @@ is_clustered() -> AllNodes = cluster_nodes(all),
on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)).
+%% This requires the process be in the same running cluster as us
+%% (i.e. not partitioned or some random node).
+%%
+%% See also rabbit_misc:is_process_alive/1 which does not.
+is_process_alive(Pid) ->
+ on_running_node(Pid) andalso
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+
cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index e7c2b5e43a..16e30cac11 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -66,8 +66,8 @@ init(#amqqueue{name = QueueName}, restart) ->
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
LocalOrMasterDown = node(QPid) =:= node()
orelse not rabbit_mnesia:on_running_node(QPid),
- Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
- case rabbit_misc:is_process_alive(QPid) of
+ Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
+ case rabbit_mnesia:is_process_alive(QPid) of
true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
rabbit_mirror_queue_slave:init(Q); %% [1]