summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2018-12-03 09:54:25 +0000
committerGitHub <noreply@github.com>2018-12-03 09:54:25 +0000
commitfc3ff9b0d71b50ebf63f7fb1fbc6c09b8d97dc7f (patch)
treebb65c12a7b7adbd40fdf2554a5a43ad560ee3c62 /src
parentb247f2ab1e478ff71c1d63c49de3474148ba5019 (diff)
parentbedface2ffbc20d3b20a41f1f8232b43458b7dcb (diff)
downloadrabbitmq-server-git-fc3ff9b0d71b50ebf63f7fb1fbc6c09b8d97dc7f.tar.gz
Merge pull request #1771 from rabbitmq/return-unack-any-down-reason
Return delivered but unack messages to the queue for noconnection reason
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl59
1 files changed, 49 insertions, 10 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79d4a3effc..bb2600dd73 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection},
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
- Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
- C#consumer{suspected_down = true};
- (_, C) -> C
- end, Cons0),
+ {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ {maps:put(K, C#consumer{suspected_down = true,
+ checked_out = #{}},
+ Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection},
_ ->
[{monitor, node, Node} | Effects0]
end,
- {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
+ {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0,
checkout(State2, Effects1);
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -427,8 +434,22 @@ apply(_, {nodeup, Node}, Effects0,
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{suspected_down = false};
+ (_, E) -> E
+ end, Enqs0),
+ {Cons1, SQ, Effects} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when node(P) =:= Node ->
+ update_or_remove_sub(
+ ConsumerId, C#consumer{suspected_down = false},
+ CAcc, SQAcc, EAcc);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Effects0}, Cons0),
% TODO: avoid list concat
- {State0, Monitors ++ Effects0, ok};
+ checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok}.
@@ -583,9 +604,7 @@ cancel_consumer(ConsumerId,
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
- S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, S0, Checked0),
+ S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
case maps:size(Cons) of
0 ->
@@ -788,6 +807,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = queue:in(MsgNum, Returns)}.
+return_all(State, Checked) ->
+ maps:fold(fun (_, {MsgNum, Msg}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -871,6 +894,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = true}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1289,6 +1314,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
ok.
+down_with_noconnection_returns_unack_test() ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#state.messages)),
+ ?assertEqual(0, queue:len(State0#state.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#state.messages)),
+ ?assertEqual(0, queue:len(State1#state.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(1, queue:len(State2a#state.returns)),
+ ok.
+
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),