diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 9 |
2 files changed, 36 insertions, 24 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 376ab5e5a2..8c7b208855 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), - % mark all consumers and enqueuers as suspect - % and monitor the node - {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - {maps:put(K, C#consumer{suspected_down = true, - checked_out = #{}}, - Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + % mark all consumers and enqueuers as suspected down + % and monitor the node so that we can find out the final state of the + % process at some later point + {Cons, State} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + %% TODO: need to increment credit here + %% with the size of the Checked map + Credit = increase_credit(C, maps:size(Checked0)), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -587,7 +593,9 @@ overview(#state{consumers = Cons, get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> - [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)]; + [{K, snd(snd(maps:get(K, Checked)))} + || K <- lists:seq(From, To), + maps:is_key(K, Checked)]; _ -> [] end. @@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, +return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = case Life of - auto -> - Num = length(MsgNumMsgs), - Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, Num)}; - once -> - Con0#consumer{checked_out = Checked} - end, + Con = Con0#consumer{checked_out = Checked, + credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> @@ -1505,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> Node = node(Pid), {State0, Effects0} = enq(1, 1, second, test_init(test)), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), - {State1, Effects1} = check(Cid, 2, State0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#state.consumers), ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), + %% validate consumer has credit {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 04a82c0e5e..955c0e4d9d 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -622,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, CDels0)}}; #consumer{last_msg_id = Prev} = C when FstId > Prev+1 -> + NumMissing = FstId - Prev + 1, + %% there may actually be fewer missing messages returned than expected + %% This can happen when a node the channel is on gets disconnected + %% from the node the leader is on and then reconnected afterwards. + %% When the node is disconnected the leader will return all checked + %% out messages to the main queue to ensure they don't get stuck in + %% case the node never comes back. Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag), Del = {delivery, Tag, Missing ++ IdMsgs}, {Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, - length(IdMsgs) + length(Missing), + length(IdMsgs) + NumMissing, C, CDels0)}}; #consumer{last_msg_id = Prev} when FstId =< Prev -> |
