diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-07-28 17:11:30 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-07-29 11:12:39 +0200 |
| commit | ed2766564f5e2cc23e62e139d435e2ea511d12dd (patch) | |
| tree | 5e752c7384d07422f8a0a504d0addb5aeb0a1063 | |
| parent | ac5373f34ec1dac6ad338d14580d28c7c2d4132e (diff) | |
| download | rabbitmq-server-git-ed2766564f5e2cc23e62e139d435e2ea511d12dd.tar.gz | |
GM: Wait for messages to be ACK'd before exiting
When the GM is asked to stop, it flushes the pending messages but
does not ensure those messages are actually delivered to all members.
In some situations, this can lead to other GMs detecting that this GM is
unexpectedly down, even if this GM just sent a message to ask them to
stop as well.
When this happens in the case of queue mirroring for instance, this
causes a slave to promote itself as the new master, and the old master
to wait forever that other slaves exit.
Fixes #224.
| -rw-r--r-- | src/gm.erl | 63 |
1 files changed, 57 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl index 95dc84e41b..b6ea0fddca 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -420,7 +420,8 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, - txn_executor + txn_executor, + shutting_down }). -record(gm_group, { name, version, members }). @@ -567,10 +568,18 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + txn_executor = TxnFun, + shutting_down = false }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +handle_call({confirmed_broadcast, Msg}, _From, + State = #state { shutting_down = {true, _} }) -> + rabbit_log:info( + "GM ~p: Dropping confirmed_broadcast, shutting down in progress:~n~p~n", + [self(), Msg]), + reply(shutting_down, State); + handle_call({confirmed_broadcast, _Msg}, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); @@ -645,6 +654,13 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); +handle_cast({broadcast, Msg, _SizeHint}, + State = #state { shutting_down = {true, _} }) -> + rabbit_log:info( + "GM ~p: Dropping broadcast, shutting down in progress:~n~p~n", + [self(), Msg]), + noreply(State); + handle_cast({broadcast, _Msg, _SizeHint}, State = #state { members_state = undefined }) -> noreply(State); @@ -744,7 +760,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, terminate(Reason, State = #state { module = Module, callback_args = Args }) -> - flush_broadcast_buffer(State), + rabbit_log:info("GM ~p: Exiting with state: ~p~n", [self(), State]), Module:handle_terminate(Args, Reason). @@ -1427,15 +1443,50 @@ activity_true(_Result, Activity, State = #state { module = Module, activity_false(Result, _Activity, State) -> {Result, State}. -if_callback_success(ok, True, _False, Arg, State) -> +if_callback_success(Result, True, False, Arg, State) -> + {NewResult, NewState} = maybe_stop(Result, State), + if_callback_success1(NewResult, True, False, Arg, NewState). + +if_callback_success1(ok, True, _False, Arg, State) -> True(ok, Arg, State); -if_callback_success( +if_callback_success1( {become, Module, Args} = Result, True, _False, Arg, State) -> True(Result, Arg, State #state { module = Module, callback_args = Args }); -if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> +if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) -> False(Result, Arg, State). +maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) -> + ShuttingDown = {true, Reason}, + case has_pending_messages(State) of + true -> rabbit_log:info( + "GM ~p: Pending messages, stop delayed (a)~n", [self()]), + {ok, State #state{ shutting_down = ShuttingDown }}; + false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }} + end; +maybe_stop(Result, #state{ shutting_down = false } = State) -> + {Result, State}; +maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) -> + case has_pending_messages(State) of + true -> rabbit_log:info( + "GM ~p: Pending messages, stop delayed (b)~n", [self()]), + {Result, State}; + false -> {{stop, Reason}, State} + end. + +has_pending_messages(#state{ broadcast_buffer = Buffer }) + when Buffer =/= [] -> + rabbit_log:info("GM ~p: Has pending messages in broadcast_buffer? true~n", + [self()]), + true; +has_pending_messages(#state{ members_state = MembersState }) -> + Ret = ([] =/= [M || {_, #member{last_pub = LP, last_ack = LA} = M} + <- MembersState, + LP =/= LA]), + rabbit_log:info("GM ~p: Has pending messages in members_state? ~p~n", + [self(), Ret]), + Ret. + maybe_confirm(_Self, _Id, Confirms, []) -> Confirms; maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> |
