diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-02-11 17:52:40 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-02-11 17:52:40 +0000 |
| commit | 7afb62dd97400511d5171c7d468d7250e255d3ab (patch) | |
| tree | 5ace8884c95b3b5e571be737ef744580b0674067 /src | |
| parent | f80034544e7786796d08764e94818340ee73ace3 (diff) | |
| download | rabbitmq-server-git-7afb62dd97400511d5171c7d468d7250e255d3ab.tar.gz | |
Do not return unacked messages in basic.cancel by quorum queues
It's not part of the spec. Messages should stay unacked as the consumer
can still ack them, it simply won't receive new messages.
Consumer state should be kept if unacked messages are present, but not listed.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 107 |
1 files changed, 65 insertions, 42 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 450ba1ac85..58e91f3d19 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -184,7 +184,7 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - suspected_down = false :: boolean() + suspected_down = false :: 'cancel' | boolean() }). -type consumer() :: #consumer{}. @@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(ConsumerId, State0, []), - % TODO: here we should really demonitor the pid but _only_ if it has no - % other consumers or enqueuers. leaving a monitor in place isn't harmful - % however + {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), checkout(Meta, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, @@ -469,7 +466,8 @@ apply(_, {down, ConsumerPid, noconnection}, {Cons, State, Effects1} = maps:fold( fun({_, P} = K, #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when node(P) =:= Node -> + {Co, St0, Eff}) when (node(P) =:= Node) and + (C#consumer.suspected_down =/= cancel)-> St = return_all(St0, Checked0), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), @@ -514,7 +512,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E) + cancel_consumer(ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #state{consumers = Cons0, @@ -538,7 +536,7 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when node(P) =:= Node -> + when (node(P) =:= Node) and (C#consumer.suspected_down =/= cancel) -> EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( ConsumerId, C#consumer{suspected_down = false}, @@ -606,7 +604,8 @@ maybe_mark_suspect_waiting_consumers(Node, _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers]. + end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.suspected_down =/= cancel]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -758,18 +757,24 @@ query_consumers(#state{consumers = Consumers, end end end, - FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) -> - {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - Active, - ActivityStatus, - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers), + FromConsumers = maps:fold(fun (_, #consumer{suspected_down = cancel}, Acc) -> + Acc; + ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, Consumers), FromWaitingConsumers = - lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + lists:foldl(fun ({_, #consumer{suspected_down = cancel}}, Acc) -> + Acc; + ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, @@ -854,42 +859,42 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #state{consumer_strategy = default} = State, Effects) -> + #state{consumer_strategy = default} = State, Effects, Reason) -> %% general case, single active consumer off - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumer_strategy = single_active, waiting_consumers = []} = State, - Effects) -> + Effects, Reason) -> %% single active consumer on, no consumers are waiting - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumers = Cons0, consumer_strategy = single_active, waiting_consumers = WaitingConsumers0} = State0, - Effects0) -> + Effects0, Reason) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of - {#consumer{checked_out = Checked0}, _} -> + {Consumer, Cons1} -> % The active consumer is to be removed % Cancel it - State1 = return_all(State0, Checked0), - Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0), + {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason), + Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), % Take another one from the waiting consumers and put it in consumers [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, - #state{service_queue = ServiceQueue} = State0, + #state{service_queue = ServiceQueue} = State1, ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), - State = State1#state{consumers = #{NewActiveConsumerId => - NewActiveConsumer}, + State = State1#state{consumers = maps:put(NewActiveConsumerId, + NewActiveConsumer, State1#state.consumers), service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - Effects2 = consumer_update_active_effects(State, NewActiveConsumerId, + Effects = consumer_update_active_effects(State, NewActiveConsumerId, NewActiveConsumer, true, - single_active, Effects1), - {State, Effects2}; + single_active, Effects2), + {State, Effects}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -914,23 +919,39 @@ consumer_update_active_effects(#state{queue_resource = QName }, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, - #state{consumers = C0} = S0, Effects0) -> +cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> case maps:take(ConsumerId, C0) of - {#consumer{checked_out = Checked0}, Cons} -> - S = return_all(S0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), - case maps:size(Cons) of + {Consumer, Cons1} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#state.consumers) of 0 -> - {S#state{consumers = Cons}, [{aux, inactive} | Effects]}; + {S, [{aux, inactive} | Effects]}; _ -> - {S#state{consumers = Cons}, Effects} + {S, Effects} end; error -> %% already removed: do nothing {S0, Effects0} end. +maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, + #state{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> + case Reason of + consumer_cancel -> + {Cons, SQ, Effects1} = + update_or_remove_sub(ConsumerId, + Consumer#consumer{lifetime = once, + credit = 0, + suspected_down = cancel}, + C0, SQ0, Effects0), + {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; + down -> + S1 = return_all(S0, Checked0), + {S1#state{consumers = Cons1}, Effects0} + end. + apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of @@ -1303,6 +1324,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = cancel}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{suspected_down = true}} -> checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, |
