diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-29 12:05:23 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-29 12:05:23 +0000 |
| commit | d663453d1e18dc36935557014756a6c7441f8c0b (patch) | |
| tree | fff30b3cffa2c6cc0d8b33e7ee8c59a7304fed53 | |
| parent | 694611f2830d0a1882f7ea065bb552a043ef75d9 (diff) | |
| download | rabbitmq-server-git-d663453d1e18dc36935557014756a6c7441f8c0b.tar.gz | |
Only respond to 'EXIT's from parent / syncer.
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 46 |
3 files changed, 62 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0820f3f9e4..2f9f4c02de 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -139,10 +139,13 @@ sync_mirrors(State = #state { name = QName, gm:broadcast(GM, {sync_start, Ref, Syncer}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go(Syncer, Ref, QName, BQ, BQS) of - {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; - {ok, BQS1} -> rabbit_log:info("Synchronising ~s: complete~n", - [rabbit_misc:rs(QName)]), - {ok, S(BQS1)} + {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; + {sync_died, R, BQS1} -> rabbit_log:info("Synchronising ~s: ~p~n", + [rabbit_misc:rs(QName), R]), + {ok, S(BQS1)}; + {ok, BQS1} -> rabbit_log:info("Synchronising ~s: complete~n", + [rabbit_misc:rs(QName)]), + {ok, S(BQS1)} end. terminate({shutdown, dropped} = Reason, diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 9ff853d59b..94f5ae0f2b 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -66,8 +66,9 @@ master_go(Syncer, Ref, QName, BQ, BQS) -> {next, Ref} -> ok end, case Acc of - {shutdown, Reason} -> {shutdown, Reason, BQS1}; - _ -> {ok, BQS1} + {shutdown, Reason} -> {shutdown, Reason, BQS1}; + {sync_died, Reason} -> {sync_died, Reason, BQS1}; + _ -> {ok, BQS1} end. master_send({Syncer, Ref, QName}, I, Last, Msg, MsgProps) -> @@ -78,10 +79,12 @@ master_send({Syncer, Ref, QName}, I, Last, Msg, MsgProps) -> erlang:now(); false -> Last end}, + Parent = rabbit_misc:get_parent(), receive - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, - {cont, Acc}; - {'EXIT', _Pid, Reason} -> {stop, {shutdown, Reason}} + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {cont, Acc}; + {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}; + {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}} end. %% Master @@ -165,6 +168,7 @@ slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS1). slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS) -> + Parent = rabbit_misc:get_parent(), receive {'DOWN', MRef, process, Syncer, _Reason} -> %% If the master dies half way we are not in the usual @@ -197,6 +201,6 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS) -> Props1 = Props#message_properties{needs_confirming = false}, BQS1 = BQ:publish(Msg, Props1, true, none, BQS), slave_sync_loop(Args, TRef, BQS1); - {'EXIT', _Pid, Reason} -> + {'EXIT', Parent, Reason} -> {stop, Reason, {TRef, BQS}} end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81bb6769ab..cd83e3b88d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -66,6 +66,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). +-export([get_parent/0]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -239,7 +240,7 @@ -spec(interval_operation/4 :: ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer()) -> {any(), non_neg_integer()}). - +-spec(get_parent/0 :: () -> pid()). -endif. %%---------------------------------------------------------------------------- @@ -1034,3 +1035,46 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> {false, false} -> lists:max([IdealInterval, round(LastInterval / 1.5)]) end}. + +%% ------------------------------------------------------------------------- +%% Begin copypasta from gen_server.erl + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid(Parent)-> + Parent; + [Parent | _] when is_atom(Parent)-> + name_to_pid(Parent); + _ -> + exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> + case whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> + case erlang:is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> + Pid + end; + [] -> undefined + end. + +%% End copypasta from gen_server.erl +%% ------------------------------------------------------------------------- |
