diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2012-02-01 17:23:57 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2012-02-01 17:23:57 +0000 |
| commit | 9b6c99384e421f767d3b353ad9f0d793f3072d28 (patch) | |
| tree | 92e68de24b37ed1d291f5e676b6370fcb018d28b /src | |
| parent | abea5ffd58a2f5eae1b59f0d1c5e40378ff02a69 (diff) | |
| parent | f2a71b50bcd1c7b6f0bf9b8807d6bd46cc4c4a8d (diff) | |
| download | rabbitmq-server-git-9b6c99384e421f767d3b353ad9f0d793f3072d28.tar.gz | |
Merge heads
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 100 |
2 files changed, 62 insertions, 52 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index d1caf5aa68..226fbea073 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of - {ok, _Pid} -> ok; - _ -> Result + {ok, undefined} -> %% Already running + ok; + {ok, _Pid} -> + rabbit_log:info( + "Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + ok; + _ -> + Result end; [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} end diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 2cdc763723..1fd927dda0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -90,7 +90,7 @@ }). start_link(Q) -> - gen_server2:start_link(?MODULE, [Q], []). + gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -98,55 +98,61 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init([#amqqueue { name = QueueName } = Q]) -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, +init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - {ok, MPid} = - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - %% ASSERTION - [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], - MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {ok, QPid} - end), - erlang:monitor(process, MPid), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [Self]), - ok = rabbit_memory_monitor:register( - Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), - {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new(), - - synchronised = false + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {new, QPid}; + [SPid] -> true = rabbit_misc:is_process_alive(SPid), + existing + end + end) of + {new, MPid} -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [Self]), + ok = rabbit_memory_monitor:register( + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}; + existing -> + ignore + end. handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> |
