summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl1
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_mirror_queue_misc.erl32
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_upgrade_functions.erl18
6 files changed, 73 insertions, 18 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c13868030a..7a40f9ebf0 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -52,6 +52,7 @@
arguments, %% immutable
pid, %% durable (just so we know home node)
slave_pids, sync_slave_pids, %% transient
+ down_slave_nodes, %% durable
policy, %% durable, implicit update as above
gm_pids, %% transient
decorators}). %% transient, recalculated as above
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8a1d162a7a..7a10d239f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
--export([on_node_down/1]).
+-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -174,6 +174,7 @@
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()).
@@ -689,6 +690,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+on_node_up(Node) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_queue,
+ #amqqueue{_ = '_'}, write),
+ [case lists:member(Node, DSNs) of
+ true -> DSNs1 = DSNs -- [Node],
+ store_queue(
+ Q#amqqueue{down_slave_nodes = DSNs1});
+ false -> ok
+ end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ ok
+ end).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -724,12 +739,13 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-immutable(Q) -> Q#amqqueue{pid = none,
- slave_pids = none,
- sync_slave_pids = none,
- gm_pids = none,
- policy = none,
- decorators = none}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ down_slave_nodes = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none}.
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 97206df350..4082c53d33 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -84,6 +84,7 @@
memory,
slave_pids,
synchronised_slave_pids,
+ down_slave_nodes,
backing_queue_status,
state
]).
@@ -810,6 +811,14 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(down_slave_nodes, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+ rabbit_amqqueue:lookup(Name),
+ case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> Nodes
+ end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 7aec1ac81f..9e8c4a1891 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -78,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids,
+ down_slave_nodes = DSNs}] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -89,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
+ DSNs1 = [node(Pid) ||
+ Pid <- SPids,
+ not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
@@ -97,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
@@ -109,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1)
end,
{ok, QPid1, DeadPids}
@@ -239,12 +245,16 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
- sync_slave_pids = SSPids}) ->
+store_updated_slaves(Q = #amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ down_slave_nodes = DSNs}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ down_slave_nodes = DSNs1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 1496147848..1c971c1da8 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -415,6 +415,7 @@ ensure_ping_timer(State) ->
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
handle_live_rabbit(Node) ->
+ ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b6d378525e..1104f3731a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -48,6 +48,7 @@
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
%% -------------------------------------------------------------------
@@ -77,6 +78,8 @@
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
+-spec(cluster_name/0 :: () -> 'ok').
+-spec(down_slave_nodes/0 :: () -> 'ok').
-endif.
@@ -382,6 +385,21 @@ cluster_name_tx() ->
[mnesia:delete(T, K, write) || K <- Ks],
ok.
+down_slave_nodes() ->
+ ok = down_slave_nodes(rabbit_queue),
+ ok = down_slave_nodes(rabbit_durable_queue).
+
+down_slave_nodes(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->