diff options
| -rw-r--r-- | include/rabbit.hrl | 1 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 18 |
6 files changed, 73 insertions, 18 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c13868030a..7a40f9ebf0 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -52,6 +52,7 @@ arguments, %% immutable pid, %% durable (just so we know home node) slave_pids, sync_slave_pids, %% transient + down_slave_nodes, %% durable policy, %% durable, implicit update as above gm_pids, %% transient decorators}). %% transient, recalculated as above diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a1d162a7a..7a10d239f5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). --export([on_node_down/1]). +-export([on_node_up/1, on_node_down/1]). -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -174,6 +174,7 @@ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). @@ -689,6 +690,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + #amqqueue{_ = '_'}, write), + [case lists:member(Node, DSNs) of + true -> DSNs1 = DSNs -- [Node], + store_queue( + Q#amqqueue{down_slave_nodes = DSNs1}); + false -> ok + end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + ok + end). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -724,12 +739,13 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -immutable(Q) -> Q#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - gm_pids = none, - policy = none, - decorators = none}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + down_slave_nodes = none, + gm_pids = none, + policy = none, + decorators = none}. deliver([], _Delivery, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 97206df350..4082c53d33 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -84,6 +84,7 @@ memory, slave_pids, synchronised_slave_pids, + down_slave_nodes, backing_queue_status, state ]). @@ -810,6 +811,14 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(down_slave_nodes, #q{q = #amqqueue{name = Name, + durable = Durable}}) -> + {ok, Q = #amqqueue{down_slave_nodes = Nodes}} = + rabbit_amqqueue:lookup(Name), + case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> Nodes + end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 7aec1ac81f..9e8c4a1891 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -78,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - gm_pids = GMPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + gm_pids = GMPids, + down_slave_nodes = DSNs}] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -89,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> AlivePids = [Pid || {_GM, Pid} <- AliveGM], Alive = [Pid || Pid <- [QPid | SPids], lists:member(Pid, AlivePids)], + DSNs1 = [node(Pid) || + Pid <- SPids, + not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> @@ -97,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1), %% If we add and remove nodes at the same time we %% might tell the old master we need to sync and @@ -109,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> _ -> %% Master has changed, and we're not it. %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1) end, {ok, QPid1, DeadPids} @@ -239,12 +245,16 @@ log(Level, QName, Fmt, Args) -> rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{slave_pids = SPids, - sync_slave_pids = SSPids}) -> +store_updated_slaves(Q = #amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids, + down_slave_nodes = DSNs}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1, + down_slave_nodes = DSNs1}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 1496147848..1c971c1da8 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -415,6 +415,7 @@ ensure_ping_timer(State) -> State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes). handle_live_rabbit(Node) -> + ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), ok = rabbit_mnesia:on_node_up(Node). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b6d378525e..1104f3731a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -48,6 +48,7 @@ -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). +-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). %% ------------------------------------------------------------------- @@ -77,6 +78,8 @@ -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). -spec(internal_system_x/0 :: () -> 'ok'). +-spec(cluster_name/0 :: () -> 'ok'). +-spec(down_slave_nodes/0 :: () -> 'ok'). -endif. @@ -382,6 +385,21 @@ cluster_name_tx() -> [mnesia:delete(T, K, write) || K <- Ks], ok. +down_slave_nodes() -> + ok = down_slave_nodes(rabbit_queue), + ok = down_slave_nodes(rabbit_durable_queue). + +down_slave_nodes(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
