diff options
| -rw-r--r-- | src/gm.erl | 63 |
1 files changed, 57 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl index 95dc84e41b..b6ea0fddca 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -420,7 +420,8 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, - txn_executor + txn_executor, + shutting_down }). -record(gm_group, { name, version, members }). @@ -567,10 +568,18 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + txn_executor = TxnFun, + shutting_down = false }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +handle_call({confirmed_broadcast, Msg}, _From, + State = #state { shutting_down = {true, _} }) -> + rabbit_log:info( + "GM ~p: Dropping confirmed_broadcast, shutting down in progress:~n~p~n", + [self(), Msg]), + reply(shutting_down, State); + handle_call({confirmed_broadcast, _Msg}, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); @@ -645,6 +654,13 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); +handle_cast({broadcast, Msg, _SizeHint}, + State = #state { shutting_down = {true, _} }) -> + rabbit_log:info( + "GM ~p: Dropping broadcast, shutting down in progress:~n~p~n", + [self(), Msg]), + noreply(State); + handle_cast({broadcast, _Msg, _SizeHint}, State = #state { members_state = undefined }) -> noreply(State); @@ -744,7 +760,7 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, terminate(Reason, State = #state { module = Module, callback_args = Args }) -> - flush_broadcast_buffer(State), + rabbit_log:info("GM ~p: Exiting with state: ~p~n", [self(), State]), Module:handle_terminate(Args, Reason). @@ -1427,15 +1443,50 @@ activity_true(_Result, Activity, State = #state { module = Module, activity_false(Result, _Activity, State) -> {Result, State}. -if_callback_success(ok, True, _False, Arg, State) -> +if_callback_success(Result, True, False, Arg, State) -> + {NewResult, NewState} = maybe_stop(Result, State), + if_callback_success1(NewResult, True, False, Arg, NewState). + +if_callback_success1(ok, True, _False, Arg, State) -> True(ok, Arg, State); -if_callback_success( +if_callback_success1( {become, Module, Args} = Result, True, _False, Arg, State) -> True(Result, Arg, State #state { module = Module, callback_args = Args }); -if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> +if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) -> False(Result, Arg, State). +maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) -> + ShuttingDown = {true, Reason}, + case has_pending_messages(State) of + true -> rabbit_log:info( + "GM ~p: Pending messages, stop delayed (a)~n", [self()]), + {ok, State #state{ shutting_down = ShuttingDown }}; + false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }} + end; +maybe_stop(Result, #state{ shutting_down = false } = State) -> + {Result, State}; +maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) -> + case has_pending_messages(State) of + true -> rabbit_log:info( + "GM ~p: Pending messages, stop delayed (b)~n", [self()]), + {Result, State}; + false -> {{stop, Reason}, State} + end. + +has_pending_messages(#state{ broadcast_buffer = Buffer }) + when Buffer =/= [] -> + rabbit_log:info("GM ~p: Has pending messages in broadcast_buffer? true~n", + [self()]), + true; +has_pending_messages(#state{ members_state = MembersState }) -> + Ret = ([] =/= [M || {_, #member{last_pub = LP, last_ack = LA} = M} + <- MembersState, + LP =/= LA]), + rabbit_log:info("GM ~p: Has pending messages in members_state? ~p~n", + [self(), Ret]), + Ret. + maybe_confirm(_Self, _Id, Confirms, []) -> Confirms; maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> |
