diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2018-12-03 09:54:25 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-03 09:54:25 +0000 |
| commit | fc3ff9b0d71b50ebf63f7fb1fbc6c09b8d97dc7f (patch) | |
| tree | bb65c12a7b7adbd40fdf2554a5a43ad560ee3c62 /src | |
| parent | b247f2ab1e478ff71c1d63c49de3474148ba5019 (diff) | |
| parent | bedface2ffbc20d3b20a41f1f8232b43458b7dcb (diff) | |
| download | rabbitmq-server-git-fc3ff9b0d71b50ebf63f7fb1fbc6c09b8d97dc7f.tar.gz | |
Merge pull request #1771 from rabbitmq/return-unack-any-down-reason
Return delivered but unack messages to the queue for noconnection reason
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 59 |
1 files changed, 49 insertions, 10 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 79d4a3effc..bb2600dd73 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection}, Node = node(ConsumerPid), % mark all consumers and enqueuers as suspect % and monitor the node - Cons = maps:map(fun({_, P}, C) when node(P) =:= Node -> - C#consumer{suspected_down = true}; - (_, C) -> C - end, Cons0), + {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + {maps:put(K, C#consumer{suspected_down = true, + checked_out = #{}}, + Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection}, _ -> [{monitor, node, Node} | Effects0] end, - {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; + {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -427,8 +434,22 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}. @@ -583,9 +604,7 @@ cancel_consumer(ConsumerId, {Effects0, #state{consumers = C0, name = Name} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> - S = maps:fold(fun (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, S0, Checked0), + S = return_all(S0, Checked0), Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), case maps:size(Cons) of 0 -> @@ -788,6 +807,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = queue:in(MsgNum, Returns)}. +return_all(State, Checked) -> + maps:fold(fun (_, {MsgNum, Msg}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -871,6 +894,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1289,6 +1314,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), ok. +down_with_noconnection_returns_unack_test() -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#state.messages)), + ?assertEqual(0, queue:len(State0#state.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#state.messages)), + ?assertEqual(0, queue:len(State1#state.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(1, queue:len(State2a#state.returns)), + ok. + down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), |
