summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-11-02 11:45:50 +0000
committerkjnilsson <knilsson@pivotal.io>2018-11-02 11:45:50 +0000
commitf47675060d40f5796f1b1e55adb64a621c42bb74 (patch)
tree66643c4ab124e73a093cc9c646782ea66c491113 /src
parent9a2ea744ab56fe1caf18c0a70ea850d4a65745a3 (diff)
downloadrabbitmq-server-git-f47675060d40f5796f1b1e55adb64a621c42bb74.tar.gz
rabbit_fifo: state_enter fix
when a ra server becomes leader rabbit_fifo should re-issue monitor effects for both consumers and enqueuers.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl33
1 files changed, 26 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index f53ffaa760..3fceb93654 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -413,18 +413,21 @@ apply(_, {down, Pid, _Info}, Effects0,
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
+ %% A node we are monitoring has come back.
+ %% If we have suspected any processes of being
+ %% down we should now re-issue the monitors for them to detect if they're
+ %% actually down or not
Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc)
- when node(P) =:= Node ->
- [P | Acc];
- (_, _, Acc) -> Acc
- end, [], Cons0),
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
- % TODO: should we unsuspect these processes here?
% TODO: avoid list concat
{State0, Monitors ++ Effects0, ok};
apply(_, {nodedown, _Node}, Effects, State) ->
@@ -432,10 +435,13 @@ apply(_, {nodedown, _Node}, Effects, State) ->
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Custs,
+ enqueuers = Enqs,
name = Name,
become_leader_handler = BLH}) ->
- % return effects to monitor all current consumerss
- Effects = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
+ % return effects to monitor all current consumers and enqueuers
+ ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
+ EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)],
+ Effects = ConMons ++ EnqMons,
case BLH of
undefined ->
Effects;
@@ -1509,6 +1515,19 @@ state_enter_test() ->
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
+leader_monitors_on_state_enter_test() ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, _} = check_auto(Cid, 2, State0),
+ Self = self(),
+ %% as we have an enqueuer _and_ a consumer we chould
+ %% get two monitor effects in total, even if they are for the same
+ %% processs
+ [{monitor, process, Self},
+ {monitor, process, Self}] = state_enter(leader, State1),
+ ok.
+
+
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),