summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-29 12:05:23 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-29 12:05:23 +0000
commitd663453d1e18dc36935557014756a6c7441f8c0b (patch)
treefff30b3cffa2c6cc0d8b33e7ee8c59a7304fed53
parent694611f2830d0a1882f7ea065bb552a043ef75d9 (diff)
downloadrabbitmq-server-git-d663453d1e18dc36935557014756a6c7441f8c0b.tar.gz
Only respond to 'EXIT's from parent / syncer.
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_sync.erl16
-rw-r--r--src/rabbit_misc.erl46
3 files changed, 62 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0820f3f9e4..2f9f4c02de 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -139,10 +139,13 @@ sync_mirrors(State = #state { name = QName,
gm:broadcast(GM, {sync_start, Ref, Syncer}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(Syncer, Ref, QName, BQ, BQS) of
- {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
- {ok, BQS1} -> rabbit_log:info("Synchronising ~s: complete~n",
- [rabbit_misc:rs(QName)]),
- {ok, S(BQS1)}
+ {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
+ {sync_died, R, BQS1} -> rabbit_log:info("Synchronising ~s: ~p~n",
+ [rabbit_misc:rs(QName), R]),
+ {ok, S(BQS1)};
+ {ok, BQS1} -> rabbit_log:info("Synchronising ~s: complete~n",
+ [rabbit_misc:rs(QName)]),
+ {ok, S(BQS1)}
end.
terminate({shutdown, dropped} = Reason,
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 9ff853d59b..94f5ae0f2b 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -66,8 +66,9 @@ master_go(Syncer, Ref, QName, BQ, BQS) ->
{next, Ref} -> ok
end,
case Acc of
- {shutdown, Reason} -> {shutdown, Reason, BQS1};
- _ -> {ok, BQS1}
+ {shutdown, Reason} -> {shutdown, Reason, BQS1};
+ {sync_died, Reason} -> {sync_died, Reason, BQS1};
+ _ -> {ok, BQS1}
end.
master_send({Syncer, Ref, QName}, I, Last, Msg, MsgProps) ->
@@ -78,10 +79,12 @@ master_send({Syncer, Ref, QName}, I, Last, Msg, MsgProps) ->
erlang:now();
false -> Last
end},
+ Parent = rabbit_misc:get_parent(),
receive
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
- {cont, Acc};
- {'EXIT', _Pid, Reason} -> {stop, {shutdown, Reason}}
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {cont, Acc};
+ {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}};
+ {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}
end.
%% Master
@@ -165,6 +168,7 @@ slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS1).
slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS) ->
+ Parent = rabbit_misc:get_parent(),
receive
{'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -197,6 +201,6 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS) ->
Props1 = Props#message_properties{needs_confirming = false},
BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
slave_sync_loop(Args, TRef, BQS1);
- {'EXIT', _Pid, Reason} ->
+ {'EXIT', Parent, Reason} ->
{stop, Reason, {TRef, BQS}}
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81bb6769ab..cd83e3b88d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -66,6 +66,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
+-export([get_parent/0]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -239,7 +240,7 @@
-spec(interval_operation/4 ::
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
-
+-spec(get_parent/0 :: () -> pid()).
-endif.
%%----------------------------------------------------------------------------
@@ -1034,3 +1035,46 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
{false, false} -> lists:max([IdealInterval,
round(LastInterval / 1.5)])
end}.
+
+%% -------------------------------------------------------------------------
+%% Begin copypasta from gen_server.erl
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid(Parent)->
+ Parent;
+ [Parent | _] when is_atom(Parent)->
+ name_to_pid(Parent);
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined ->
+ case whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true ->
+ Pid
+ end;
+ [] -> undefined
+ end.
+
+%% End copypasta from gen_server.erl
+%% -------------------------------------------------------------------------