summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl51
-rw-r--r--src/rabbit_fifo_client.erl9
2 files changed, 36 insertions, 24 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 376ab5e5a2..8c7b208855 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection},
Effects0, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
- % mark all consumers and enqueuers as suspect
- % and monitor the node
- {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
- {Co, St0}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- {maps:put(K, C#consumer{suspected_down = true,
- checked_out = #{}},
- Co),
- St};
- (K, C, {Co, St}) ->
- {maps:put(K, C, Co), St}
- end, {#{}, State0}, Cons0),
+ % mark all consumers and enqueuers as suspected down
+ % and monitor the node so that we can find out the final state of the
+ % process at some later point
+ {Cons, State} = maps:fold(
+ fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ %% TODO: need to increment credit here
+ %% with the size of the Checked map
+ Credit = increase_credit(C, maps:size(Checked0)),
+ {maps:put(K, C#consumer{suspected_down = true,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -587,7 +593,9 @@ overview(#state{consumers = Cons,
get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
- [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)];
+ [{K, snd(snd(maps:get(K, Checked)))}
+ || K <- lists:seq(From, To),
+ maps:is_key(K, Checked)];
_ ->
[]
end.
@@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
+return(ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
- Con = case Life of
- auto ->
- Num = length(MsgNumMsgs),
- Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, Num)};
- once ->
- Con0#consumer{checked_out = Checked}
- end,
+ Con = Con0#consumer{checked_out = Checked,
+ credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
@@ -1505,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
Node = node(Pid),
{State0, Effects0} = enq(1, 1, second, test_init(test)),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
- {State1, Effects1} = check(Cid, 2, State0),
+ {State1, Effects1} = check_auto(Cid, 2, State0),
+ #consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
+ %% validate consumer has credit
{State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 04a82c0e5e..955c0e4d9d 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -622,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
CDels0)}};
#consumer{last_msg_id = Prev} = C
when FstId > Prev+1 ->
+ NumMissing = FstId - Prev + 1,
+ %% there may actually be fewer missing messages returned than expected
+ %% This can happen when a node the channel is on gets disconnected
+ %% from the node the leader is on and then reconnected afterwards.
+ %% When the node is disconnected the leader will return all checked
+ %% out messages to the main queue to ensure they don't get stuck in
+ %% case the node never comes back.
Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag),
Del = {delivery, Tag, Missing ++ IdMsgs},
{Del, State0#state{consumer_deliveries =
update_consumer(Tag, LastId,
- length(IdMsgs) + length(Missing),
+ length(IdMsgs) + NumMissing,
C, CDels0)}};
#consumer{last_msg_id = Prev}
when FstId =< Prev ->