diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-07-29 13:26:40 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-07-29 13:26:40 +0300 |
| commit | aaa09daf1a451d141a49a2b90238db9852d657d9 (patch) | |
| tree | 9f73dba2f33587fed27d548a24b7196aafa973ec | |
| parent | ac5373f34ec1dac6ad338d14580d28c7c2d4132e (diff) | |
| parent | 4606c363a06482164a34b1255f4f341a6528c5e8 (diff) | |
| download | rabbitmq-server-git-aaa09daf1a451d141a49a2b90238db9852d657d9.tar.gz | |
Merge branch 'rabbitmq-server-224' into stable
| -rw-r--r-- | src/gm.erl | 47 |
1 files changed, 41 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl index 95dc84e41b..4c1b4ee4cb 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -420,7 +420,8 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, - txn_executor + txn_executor, + shutting_down }). -record(gm_group, { name, version, members }). @@ -567,11 +568,16 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + txn_executor = TxnFun, + shutting_down = false }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({confirmed_broadcast, _Msg}, _From, + State = #state { shutting_down = {true, _} }) -> + reply(shutting_down, State); + +handle_call({confirmed_broadcast, _Msg}, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); @@ -646,6 +652,10 @@ handle_cast({?TAG, ReqVer, Msg}, Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { shutting_down = {true, _} }) -> + noreply(State); + +handle_cast({broadcast, _Msg, _SizeHint}, State = #state { members_state = undefined }) -> noreply(State); @@ -744,7 +754,6 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, terminate(Reason, State = #state { module = Module, callback_args = Args }) -> - flush_broadcast_buffer(State), Module:handle_terminate(Args, Reason). @@ -1427,15 +1436,41 @@ activity_true(_Result, Activity, State = #state { module = Module, activity_false(Result, _Activity, State) -> {Result, State}. -if_callback_success(ok, True, _False, Arg, State) -> +if_callback_success(Result, True, False, Arg, State) -> + {NewResult, NewState} = maybe_stop(Result, State), + if_callback_success1(NewResult, True, False, Arg, NewState). + +if_callback_success1(ok, True, _False, Arg, State) -> True(ok, Arg, State); -if_callback_success( +if_callback_success1( {become, Module, Args} = Result, True, _False, Arg, State) -> True(Result, Arg, State #state { module = Module, callback_args = Args }); -if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> +if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) -> False(Result, Arg, State). +maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) -> + ShuttingDown = {true, Reason}, + case has_pending_messages(State) of + true -> {ok, State #state{ shutting_down = ShuttingDown }}; + false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }} + end; +maybe_stop(Result, #state{ shutting_down = false } = State) -> + {Result, State}; +maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) -> + case has_pending_messages(State) of + true -> {Result, State}; + false -> {{stop, Reason}, State} + end. + +has_pending_messages(#state{ broadcast_buffer = Buffer }) + when Buffer =/= [] -> + true; +has_pending_messages(#state{ members_state = MembersState }) -> + [] =/= [M || {_, #member{last_pub = LP, last_ack = LA} = M} + <- MembersState, + LP =/= LA]. + maybe_confirm(_Self, _Id, Confirms, []) -> Confirms; maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> |
