summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/gm.erl63
1 files changed, 57 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 95dc84e41b..b6ea0fddca 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -420,7 +420,8 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
- txn_executor
+ txn_executor,
+ shutting_down
}).
-record(gm_group, { name, version, members }).
@@ -567,10 +568,18 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ txn_executor = TxnFun,
+ shutting_down = false }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+handle_call({confirmed_broadcast, Msg}, _From,
+ State = #state { shutting_down = {true, _} }) ->
+ rabbit_log:info(
+ "GM ~p: Dropping confirmed_broadcast, shutting down in progress:~n~p~n",
+ [self(), Msg]),
+ reply(shutting_down, State);
+
handle_call({confirmed_broadcast, _Msg}, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
@@ -645,6 +654,13 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+handle_cast({broadcast, Msg, _SizeHint},
+ State = #state { shutting_down = {true, _} }) ->
+ rabbit_log:info(
+ "GM ~p: Dropping broadcast, shutting down in progress:~n~p~n",
+ [self(), Msg]),
+ noreply(State);
+
handle_cast({broadcast, _Msg, _SizeHint},
State = #state { members_state = undefined }) ->
noreply(State);
@@ -744,7 +760,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
terminate(Reason, State = #state { module = Module,
callback_args = Args }) ->
- flush_broadcast_buffer(State),
+ rabbit_log:info("GM ~p: Exiting with state: ~p~n", [self(), State]),
Module:handle_terminate(Args, Reason).
@@ -1427,15 +1443,50 @@ activity_true(_Result, Activity, State = #state { module = Module,
activity_false(Result, _Activity, State) ->
{Result, State}.
-if_callback_success(ok, True, _False, Arg, State) ->
+if_callback_success(Result, True, False, Arg, State) ->
+ {NewResult, NewState} = maybe_stop(Result, State),
+ if_callback_success1(NewResult, True, False, Arg, NewState).
+
+if_callback_success1(ok, True, _False, Arg, State) ->
True(ok, Arg, State);
-if_callback_success(
+if_callback_success1(
{become, Module, Args} = Result, True, _False, Arg, State) ->
True(Result, Arg, State #state { module = Module,
callback_args = Args });
-if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
+if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) ->
False(Result, Arg, State).
+maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) ->
+ ShuttingDown = {true, Reason},
+ case has_pending_messages(State) of
+ true -> rabbit_log:info(
+ "GM ~p: Pending messages, stop delayed (a)~n", [self()]),
+ {ok, State #state{ shutting_down = ShuttingDown }};
+ false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }}
+ end;
+maybe_stop(Result, #state{ shutting_down = false } = State) ->
+ {Result, State};
+maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) ->
+ case has_pending_messages(State) of
+ true -> rabbit_log:info(
+ "GM ~p: Pending messages, stop delayed (b)~n", [self()]),
+ {Result, State};
+ false -> {{stop, Reason}, State}
+ end.
+
+has_pending_messages(#state{ broadcast_buffer = Buffer })
+ when Buffer =/= [] ->
+ rabbit_log:info("GM ~p: Has pending messages in broadcast_buffer? true~n",
+ [self()]),
+ true;
+has_pending_messages(#state{ members_state = MembersState }) ->
+ Ret = ([] =/= [M || {_, #member{last_pub = LP, last_ack = LA} = M}
+ <- MembersState,
+ LP =/= LA]),
+ rabbit_log:info("GM ~p: Has pending messages in members_state? ~p~n",
+ [self(), Ret]),
+ Ret.
+
maybe_confirm(_Self, _Id, Confirms, []) ->
Confirms;
maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->