summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-07-28 17:11:30 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-07-29 11:12:39 +0200
commited2766564f5e2cc23e62e139d435e2ea511d12dd (patch)
tree5e752c7384d07422f8a0a504d0addb5aeb0a1063 /src
parentac5373f34ec1dac6ad338d14580d28c7c2d4132e (diff)
downloadrabbitmq-server-git-ed2766564f5e2cc23e62e139d435e2ea511d12dd.tar.gz
GM: Wait for messages to be ACK'd before exiting
When the GM is asked to stop, it flushes the pending messages but does not ensure those messages are actually delivered to all members. In some situations, this can lead to other GMs detecting that this GM is unexpectedly down, even if this GM just sent a message to ask them to stop as well. When this happens in the case of queue mirroring for instance, this causes a slave to promote itself as the new master, and the old master to wait forever that other slaves exit. Fixes #224.
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl63
1 files changed, 57 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 95dc84e41b..b6ea0fddca 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -420,7 +420,8 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
- txn_executor
+ txn_executor,
+ shutting_down
}).
-record(gm_group, { name, version, members }).
@@ -567,10 +568,18 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ txn_executor = TxnFun,
+ shutting_down = false }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+handle_call({confirmed_broadcast, Msg}, _From,
+ State = #state { shutting_down = {true, _} }) ->
+ rabbit_log:info(
+ "GM ~p: Dropping confirmed_broadcast, shutting down in progress:~n~p~n",
+ [self(), Msg]),
+ reply(shutting_down, State);
+
handle_call({confirmed_broadcast, _Msg}, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
@@ -645,6 +654,13 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+handle_cast({broadcast, Msg, _SizeHint},
+ State = #state { shutting_down = {true, _} }) ->
+ rabbit_log:info(
+ "GM ~p: Dropping broadcast, shutting down in progress:~n~p~n",
+ [self(), Msg]),
+ noreply(State);
+
handle_cast({broadcast, _Msg, _SizeHint},
State = #state { members_state = undefined }) ->
noreply(State);
@@ -744,7 +760,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
terminate(Reason, State = #state { module = Module,
callback_args = Args }) ->
- flush_broadcast_buffer(State),
+ rabbit_log:info("GM ~p: Exiting with state: ~p~n", [self(), State]),
Module:handle_terminate(Args, Reason).
@@ -1427,15 +1443,50 @@ activity_true(_Result, Activity, State = #state { module = Module,
activity_false(Result, _Activity, State) ->
{Result, State}.
-if_callback_success(ok, True, _False, Arg, State) ->
+if_callback_success(Result, True, False, Arg, State) ->
+ {NewResult, NewState} = maybe_stop(Result, State),
+ if_callback_success1(NewResult, True, False, Arg, NewState).
+
+if_callback_success1(ok, True, _False, Arg, State) ->
True(ok, Arg, State);
-if_callback_success(
+if_callback_success1(
{become, Module, Args} = Result, True, _False, Arg, State) ->
True(Result, Arg, State #state { module = Module,
callback_args = Args });
-if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
+if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) ->
False(Result, Arg, State).
+maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) ->
+ ShuttingDown = {true, Reason},
+ case has_pending_messages(State) of
+ true -> rabbit_log:info(
+ "GM ~p: Pending messages, stop delayed (a)~n", [self()]),
+ {ok, State #state{ shutting_down = ShuttingDown }};
+ false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }}
+ end;
+maybe_stop(Result, #state{ shutting_down = false } = State) ->
+ {Result, State};
+maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) ->
+ case has_pending_messages(State) of
+ true -> rabbit_log:info(
+ "GM ~p: Pending messages, stop delayed (b)~n", [self()]),
+ {Result, State};
+ false -> {{stop, Reason}, State}
+ end.
+
+has_pending_messages(#state{ broadcast_buffer = Buffer })
+ when Buffer =/= [] ->
+ rabbit_log:info("GM ~p: Has pending messages in broadcast_buffer? true~n",
+ [self()]),
+ true;
+has_pending_messages(#state{ members_state = MembersState }) ->
+ Ret = ([] =/= [M || {_, #member{last_pub = LP, last_ack = LA} = M}
+ <- MembersState,
+ LP =/= LA]),
+ rabbit_log:info("GM ~p: Has pending messages in members_state? ~p~n",
+ [self(), Ret]),
+ Ret.
+
maybe_confirm(_Self, _Id, Confirms, []) ->
Confirms;
maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->