summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-11-20 12:03:30 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-03 09:27:17 +0000
commitc318bb9d8685e66b9dbbaa2171b4a8ae3c95290b (patch)
treee5a5b831042e41b95084a14b6b5d40be269ee0d7 /src
parent0c7c7d960c8ca54b5b29c06d295acef0bc9f3c7a (diff)
downloadrabbitmq-server-git-c318bb9d8685e66b9dbbaa2171b4a8ae3c95290b.tar.gz
Return delivered but unack messages to the queeu for noconnection reason
[#161679638]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl38
1 files changed, 30 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79d4a3effc..9f24a677b0 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection},
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
- Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
- C#consumer{suspected_down = true};
- (_, C) -> C
- end, Cons0),
+ {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ {maps:put(K, C#consumer{suspected_down = true,
+ checked_out = #{}},
+ Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection},
_ ->
[{monitor, node, Node} | Effects0]
end,
- {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
+ {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -583,9 +589,7 @@ cancel_consumer(ConsumerId,
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
- S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, S0, Checked0),
+ S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
case maps:size(Cons) of
0 ->
@@ -788,6 +792,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = queue:in(MsgNum, Returns)}.
+return_all(State, Checked) ->
+ maps:fold(fun (_, {MsgNum, Msg}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -1289,6 +1297,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
ok.
+down_with_noconnection_returns_unack_test() ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#state.messages)),
+ ?assertEqual(0, queue:len(State0#state.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#state.messages)),
+ ?assertEqual(0, queue:len(State1#state.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(1, queue:len(State2a#state.returns)),
+ ok.
+
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),