summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-07-29 15:02:06 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-07-29 15:02:06 +0300
commitc9c2876b3dff656ddabedc0b2ab8d9ed11853297 (patch)
tree87959ef73cb59256afc8a120c9d0fcb80dcb9da1
parentc3088e96e2b808676b87801940f0e77ba487755a (diff)
parentaaa09daf1a451d141a49a2b90238db9852d657d9 (diff)
downloadrabbitmq-server-git-c9c2876b3dff656ddabedc0b2ab8d9ed11853297.tar.gz
Merge branch 'stable'
-rw-r--r--src/gm.erl47
1 files changed, 41 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 95dc84e41b..4c1b4ee4cb 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -420,7 +420,8 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
- txn_executor
+ txn_executor,
+ shutting_down
}).
-record(gm_group, { name, version, members }).
@@ -567,11 +568,16 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ txn_executor = TxnFun,
+ shutting_down = false }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({confirmed_broadcast, _Msg}, _From,
+ State = #state { shutting_down = {true, _} }) ->
+ reply(shutting_down, State);
+
+handle_call({confirmed_broadcast, _Msg}, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
@@ -646,6 +652,10 @@ handle_cast({?TAG, ReqVer, Msg},
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { shutting_down = {true, _} }) ->
+ noreply(State);
+
+handle_cast({broadcast, _Msg, _SizeHint},
State = #state { members_state = undefined }) ->
noreply(State);
@@ -744,7 +754,6 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
terminate(Reason, State = #state { module = Module,
callback_args = Args }) ->
- flush_broadcast_buffer(State),
Module:handle_terminate(Args, Reason).
@@ -1427,15 +1436,41 @@ activity_true(_Result, Activity, State = #state { module = Module,
activity_false(Result, _Activity, State) ->
{Result, State}.
-if_callback_success(ok, True, _False, Arg, State) ->
+if_callback_success(Result, True, False, Arg, State) ->
+ {NewResult, NewState} = maybe_stop(Result, State),
+ if_callback_success1(NewResult, True, False, Arg, NewState).
+
+if_callback_success1(ok, True, _False, Arg, State) ->
True(ok, Arg, State);
-if_callback_success(
+if_callback_success1(
{become, Module, Args} = Result, True, _False, Arg, State) ->
True(Result, Arg, State #state { module = Module,
callback_args = Args });
-if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
+if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) ->
False(Result, Arg, State).
+maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) ->
+ ShuttingDown = {true, Reason},
+ case has_pending_messages(State) of
+ true -> {ok, State #state{ shutting_down = ShuttingDown }};
+ false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }}
+ end;
+maybe_stop(Result, #state{ shutting_down = false } = State) ->
+ {Result, State};
+maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) ->
+ case has_pending_messages(State) of
+ true -> {Result, State};
+ false -> {{stop, Reason}, State}
+ end.
+
+has_pending_messages(#state{ broadcast_buffer = Buffer })
+ when Buffer =/= [] ->
+ true;
+has_pending_messages(#state{ members_state = MembersState }) ->
+ [] =/= [M || {_, #member{last_pub = LP, last_ack = LA} = M}
+ <- MembersState,
+ LP =/= LA].
+
maybe_confirm(_Self, _Id, Confirms, []) ->
Confirms;
maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->