diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 23 |
1 files changed, 20 insertions, 3 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 9f24a677b0..38c86cd3ba 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -395,7 +395,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node} | Effects0] end, {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; -apply(_, {down, Pid, _Info}, Effects0, +apply(_, {down, Pid, _Info} = D, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -417,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -433,8 +434,22 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}. @@ -879,6 +894,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, |
