diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-03 18:06:27 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-03 18:06:27 +0100 |
| commit | 746c1bf0404f71520a32bc972644af2b57cff7e9 (patch) | |
| tree | cfee3c0639d38dac9a5df4bbe4409b6c78030671 /src | |
| parent | 731e06ac54ae1d9b34ea19e1eadb225d6e3eda50 (diff) | |
| download | rabbitmq-server-git-746c1bf0404f71520a32bc972644af2b57cff7e9.tar.gz | |
Progress towards storing a GM pid -> queue pid mapping. This is dependent on not feeding the process_death message into remove_from_queue - bug 25195 suggests this might be TRTTD.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 |
5 files changed, 63 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index adab5d032c..466d3b8ab9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], - sync_slave_pids = []}), + sync_slave_pids = [], + gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c2bbcf9230..02b96629e9 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -93,11 +93,17 @@ init(Q, Recover, AsyncCallback) -> BQS = BQ:init(Q, Recover, AsyncCallback), init_with_existing_bq(Q, BQ, BQS). -init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> +init_with_existing_bq(#amqqueue { name = QName, + gm_pids = []} = Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + Q1 = Q#amqqueue{gm_pids = [{GM, self()}]}, + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = rabbit_amqqueue:store_queue(Q1) + end), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q1), rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, @@ -114,6 +120,7 @@ stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS }) -> unlink(CPid), + %% TODO remove GM from mnesia stop_all_slaves(shutdown, State), {BQ, BQS}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 453f2f2c68..1011a4ffbc 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -58,8 +58,8 @@ %% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadGMPids) -> - DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes, + ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- + [node(DeadGMPid) || DeadGMPid <- DeadGMPids], rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -67,10 +67,20 @@ remove_from_queue(QueueName, DeadGMPids) -> case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> - Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + slave_pids = SPids, + gm_pids = GMPids }] -> + + {Dead, GMPids1} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- Dead, Pid =/= existing], + {_, Alive} = lists:partition( + fun (Pid) -> + lists:member(Pid, DeadPids) + end, [QPid | SPids]), {QPid1, SPids1} = promote_slave(Alive), + case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, [], []}; @@ -80,7 +90,8 @@ remove_from_queue(QueueName, DeadGMPids) -> %% become the master. Q1 = store_updated_slaves( Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), + slave_pids = SPids1, + gm_pids = GMPids1 }), %% Sometimes a slave dying means we need %% to start more on other nodes - %% "exactly" mode can cause this to diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index fff02140d2..7c799dd815 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -105,7 +105,7 @@ init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun() -> init_it(Self, Node, QueueName) end) of + fun() -> init_it(Self, GM, Node, QueueName) end) of {new, MPid} -> erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( @@ -145,27 +145,37 @@ init(#amqqueue { name = QueueName } = Q) -> ignore end. -init_it(Self, Node, QueueName) -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = +init_it(Self, GM, Node, QueueName) -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids, gm_pids = GMPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> add_slave(Q1, Self, MPids), + [] -> add_slave(Q1, Self, GM), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of - true -> existing; - false -> add_slave(Q1, Self, MPids -- [SPid]), + true -> Q2 = Q1#amqqueue{gm_pids = [{GM, existing} | + GMPids]}, + ok = rabbit_amqqueue:store_queue(Q2), + existing; + false -> add_slave(forget_slave(SPid, Q1), Self, GM), {new, QPid} end end. %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 -add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( - Q#amqqueue{slave_pids = MPids ++ [New]}). +add_slave(Q = #amqqueue{gm_pids = GMPids, slave_pids = SPids}, New, GM) -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = SPids ++ [New], + gm_pids = [{GM, New} | GMPids]}). + +forget_slave(SPid, Q = #amqqueue{slave_pids = SPids, + gm_pids = GMPids}) -> + Q#amqqueue{slave_pids = SPids -- [SPid], + gm_pids = [T || T = {S, _} <- GMPids, S =/= SPid]}. handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. @@ -355,7 +365,8 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; handle_msg([SPid], _From, {process_death, Pid}) -> - inform_deaths(SPid, [Pid]); + %%inform_deaths(SPid, [Pid]); TODO see bug25195 (?) + ok; handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index ddc9c565b1..21fdcd667b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -42,6 +42,7 @@ [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). +-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). %% ------------------------------------------------------------------- @@ -66,6 +67,7 @@ -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). +-spec(gm_pids/0 :: () -> 'ok'). -endif. @@ -268,6 +270,19 @@ no_mirror_nodes() -> || T <- Tables], ok. +gm_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddGMPidsFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []} + end, + [ok = transform(T, AddGMPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy, gm_pids]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- |
