diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-18 12:46:00 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-18 12:46:00 +0000 |
| commit | 78eedc2323bdf318d0c340f0479ae8ea1065a7b2 (patch) | |
| tree | bd253acd049dce59b8338d190a50c77c505456c0 /src | |
| parent | 8636e72a0f7500b6b78bc2dbb4ddc92f2d5bd3e5 (diff) | |
| download | rabbitmq-server-git-78eedc2323bdf318d0c340f0479ae8ea1065a7b2.tar.gz | |
Quorum queue: fix crash bug after reconnection
When a consumer node is disconnected the quorum queue will return all
outstanding messages for the consumer to the queue as it cannot know if
the consumer will ever come back or not and cannot leave things checked
out forever. If the consumer node then reconnectes and the consumer
channel is still alive it may try to query for missing deliveries and
this previously crashed if the messages had been returned to the main
queue. This fix makes the query safer as well as handling missing
messages better.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 9 |
2 files changed, 36 insertions, 24 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 376ab5e5a2..8c7b208855 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), - % mark all consumers and enqueuers as suspect - % and monitor the node - {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - {maps:put(K, C#consumer{suspected_down = true, - checked_out = #{}}, - Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + % mark all consumers and enqueuers as suspected down + % and monitor the node so that we can find out the final state of the + % process at some later point + {Cons, State} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + %% TODO: need to increment credit here + %% with the size of the Checked map + Credit = increase_credit(C, maps:size(Checked0)), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -587,7 +593,9 @@ overview(#state{consumers = Cons, get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> - [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)]; + [{K, snd(snd(maps:get(K, Checked)))} + || K <- lists:seq(From, To), + maps:is_key(K, Checked)]; _ -> [] end. @@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, +return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = case Life of - auto -> - Num = length(MsgNumMsgs), - Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, Num)}; - once -> - Con0#consumer{checked_out = Checked} - end, + Con = Con0#consumer{checked_out = Checked, + credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> @@ -1505,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> Node = node(Pid), {State0, Effects0} = enq(1, 1, second, test_init(test)), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), - {State1, Effects1} = check(Cid, 2, State0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#state.consumers), ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), + %% validate consumer has credit {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 04a82c0e5e..955c0e4d9d 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -622,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, CDels0)}}; #consumer{last_msg_id = Prev} = C when FstId > Prev+1 -> + NumMissing = FstId - Prev + 1, + %% there may actually be fewer missing messages returned than expected + %% This can happen when a node the channel is on gets disconnected + %% from the node the leader is on and then reconnected afterwards. + %% When the node is disconnected the leader will return all checked + %% out messages to the main queue to ensure they don't get stuck in + %% case the node never comes back. Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag), Del = {delivery, Tag, Missing ++ IdMsgs}, {Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, - length(IdMsgs) + length(Missing), + length(IdMsgs) + NumMissing, C, CDels0)}}; #consumer{last_msg_id = Prev} when FstId =< Prev -> |
