summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-02-11 17:52:40 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-02-11 17:52:40 +0000
commit7afb62dd97400511d5171c7d468d7250e255d3ab (patch)
tree5ace8884c95b3b5e571be737ef744580b0674067 /src
parentf80034544e7786796d08764e94818340ee73ace3 (diff)
downloadrabbitmq-server-git-7afb62dd97400511d5171c7d468d7250e255d3ab.tar.gz
Do not return unacked messages in basic.cancel by quorum queues
It's not part of the spec. Messages should stay unacked as the consumer can still ack them, it simply won't receive new messages. Consumer state should be kept if unacked messages are present, but not listed.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl107
1 files changed, 65 insertions, 42 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 450ba1ac85..58e91f3d19 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -184,7 +184,7 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
- suspected_down = false :: boolean()
+ suspected_down = false :: 'cancel' | boolean()
}).
-type consumer() :: #consumer{}.
@@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(ConsumerId, State0, []),
- % TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers. leaving a monitor in place isn't harmful
- % however
+ {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
checkout(Meta, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
@@ -469,7 +466,8 @@ apply(_, {down, ConsumerPid, noconnection},
{Cons, State, Effects1} = maps:fold(
fun({_, P} = K,
#consumer{checked_out = Checked0} = C,
- {Co, St0, Eff}) when node(P) =:= Node ->
+ {Co, St0, Eff}) when (node(P) =:= Node) and
+ (C#consumer.suspected_down =/= cancel)->
St = return_all(St0, Checked0),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
@@ -514,7 +512,7 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E)
+ cancel_consumer(ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
@@ -538,7 +536,7 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when node(P) =:= Node ->
+ when (node(P) =:= Node) and (C#consumer.suspected_down =/= cancel) ->
EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
update_or_remove_sub(
ConsumerId, C#consumer{suspected_down = false},
@@ -606,7 +604,8 @@ maybe_mark_suspect_waiting_consumers(Node,
_ ->
{ConsumerId, Consumer}
end
- end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers].
+ end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.suspected_down =/= cancel].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -758,18 +757,24 @@ query_consumers(#state{consumers = Consumers,
end
end
end,
- FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) ->
- {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
- {Pid, Tag,
- maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined),
- Active,
- ActivityStatus,
- maps:get(args, Meta, []),
- maps:get(username, Meta, undefined)}
- end, Consumers),
+ FromConsumers = maps:fold(fun (_, #consumer{suspected_down = cancel}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
FromWaitingConsumers =
- lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ lists:foldl(fun ({_, #consumer{suspected_down = cancel}}, Acc) ->
+ Acc;
+ ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
maps:put({Tag, Pid},
{Pid, Tag,
@@ -854,42 +859,42 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #state{consumer_strategy = default} = State, Effects) ->
+ #state{consumer_strategy = default} = State, Effects, Reason) ->
%% general case, single active consumer off
- cancel_consumer0(ConsumerId, State, Effects);
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumer_strategy = single_active,
waiting_consumers = []} = State,
- Effects) ->
+ Effects, Reason) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, State, Effects);
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumers = Cons0,
consumer_strategy = single_active,
waiting_consumers = WaitingConsumers0} = State0,
- Effects0) ->
+ Effects0, Reason) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
- {#consumer{checked_out = Checked0}, _} ->
+ {Consumer, Cons1} ->
% The active consumer is to be removed
% Cancel it
- State1 = return_all(State0, Checked0),
- Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0),
+ {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason),
+ Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1),
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer}
| RemainingWaitingConsumers] = WaitingConsumers0,
- #state{service_queue = ServiceQueue} = State0,
+ #state{service_queue = ServiceQueue} = State1,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
NewActiveConsumer,
ServiceQueue),
- State = State1#state{consumers = #{NewActiveConsumerId =>
- NewActiveConsumer},
+ State = State1#state{consumers = maps:put(NewActiveConsumerId,
+ NewActiveConsumer, State1#state.consumers),
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- Effects2 = consumer_update_active_effects(State, NewActiveConsumerId,
+ Effects = consumer_update_active_effects(State, NewActiveConsumerId,
NewActiveConsumer, true,
- single_active, Effects1),
- {State, Effects2};
+ single_active, Effects2),
+ {State, Effects};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
@@ -914,23 +919,39 @@ consumer_update_active_effects(#state{queue_resource = QName },
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId,
- #state{consumers = C0} = S0, Effects0) ->
+cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
case maps:take(ConsumerId, C0) of
- {#consumer{checked_out = Checked0}, Cons} ->
- S = return_all(S0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
- case maps:size(Cons) of
+ {Consumer, Cons1} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+ case maps:size(S#state.consumers) of
0 ->
- {S#state{consumers = Cons}, [{aux, inactive} | Effects]};
+ {S, [{aux, inactive} | Effects]};
_ ->
- {S#state{consumers = Cons}, Effects}
+ {S, Effects}
end;
error ->
%% already removed: do nothing
{S0, Effects0}
end.
+maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1,
+ #state{consumers = C0,
+ service_queue = SQ0} = S0, Effects0, Reason) ->
+ case Reason of
+ consumer_cancel ->
+ {Cons, SQ, Effects1} =
+ update_or_remove_sub(ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ suspected_down = cancel},
+ C0, SQ0, Effects0),
+ {S0#state{consumers = Cons, service_queue = SQ}, Effects1};
+ down ->
+ S1 = return_all(S0, Checked0),
+ {S1#state{consumers = Cons1}, Effects0}
+ end.
+
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
Bytes = message_size(RawMsg),
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
@@ -1303,6 +1324,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = cancel}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{suspected_down = true}} ->
checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,