summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-27 17:04:24 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-27 17:04:24 +0000
commit44928187312c9ffaaa272a4daa85211b4d400e34 (patch)
treec4919dfd992f6ff8641cd5b9c0201aa5a90fcdd0
parent3b0adfda40edf59496ff9f6d994a11c27971a3f5 (diff)
downloadrabbitmq-server-git-44928187312c9ffaaa272a4daa85211b4d400e34.tar.gz
rabbit_fifo: cancel should not remove single active consumer
This change keeps a cancelled single active consumer in the consuemrs map but with the cancelled status allowing another consumer to take over as the active one. [#164135123]
-rw-r--r--src/rabbit_fifo.erl54
-rw-r--r--src/rabbit_quorum_queue.erl3
-rw-r--r--test/rabbit_fifo_SUITE.erl55
3 files changed, 88 insertions, 24 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 64f5b09bb4..9ddecb0824 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -784,6 +784,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{Consumer, Cons1} ->
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
Effects0, Reason),
+ %% The effects are emitted before the consumer is actually removed
+ %% if the consumer has unacked messages. This is a bit weird but
+ %% in line with what classic queues do (from an external point of
+ %% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
case maps:size(S#?MODULE.consumers) of
0 ->
@@ -796,32 +800,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
-activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects)
- when map_size(Cons) == 1 ->
- {State0, Effects};
-activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0,
+activate_next_consumer(#?MODULE{consumers = Cons,
+ waiting_consumers = Waiting0} = State0,
Effects0) ->
- case lists:filter(fun ({_, #consumer{status = Status}}) ->
- Status == up
- end, Waiting0) of
- [{NextConsumerId, NextConsumer} | _] ->
- Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
- #?MODULE{service_queue = ServiceQueue} = State0,
- ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
- NextConsumer,
- ServiceQueue),
- State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer},
- service_queue = ServiceQueue1,
- waiting_consumers = Remaining},
- Effects = consumer_update_active_effects(State, NextConsumerId,
- NextConsumer, true,
- single_active, Effects0),
- {State, Effects};
- [] ->
- {State0, Effects0}
+ case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
+ Up when map_size(Up) == 0 ->
+ %% there are no active consumer in the consumer map
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ %% there is a potential next active consumer
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?MODULE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, Effects0}
+ end;
+ _ ->
+ {State0, Effects0}
end.
+
maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
Cons1, #?MODULE{consumers = C0,
service_queue = SQ0} = S0,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e811bfffb3..28cfe9e558 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -190,7 +190,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act
QName, Prefetch, Active, ActivityStatus, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
+ [QName, ChPid, ConsumerTag]).
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 6cc167b050..a45d423e69 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -1029,9 +1029,62 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
{_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
- ct:pal("Effects3 ~w", [Effects3]),
?assertEqual(5, length(Effects3)).
+single_active_cancelled_with_unacked_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 1, simple_prefetch},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% enqueue 2 messages
+ {State2, _Effects2} = enq(3, 1, msg1, State1),
+ {State3, _Effects3} = enq(4, 2, msg2, State2),
+ %% one should be checked ou to C1
+ %% cancel C1
+ {State4, _, _} = apply(meta(5),
+ make_checkout(C1, cancel, #{}),
+ State3),
+ %% C2 should be the active consumer
+ ?assertMatch(#{C2 := #consumer{status = up,
+ checked_out = #{0 := _}}},
+ State4#rabbit_fifo.consumers),
+ %% C1 should be a cancelled consumer
+ ?assertMatch(#{C1 := #consumer{status = cancelled,
+ lifetime = once,
+ checked_out = #{0 := _}}},
+ State4#rabbit_fifo.consumers),
+ ?assertMatch([], State4#rabbit_fifo.waiting_consumers),
+
+ %% Ack both messages
+ {State5, _Effects5} = settle(C1, 1, 0, State4),
+ %% C1 should now be cancelled
+ {State6, _Effects6} = settle(C2, 2, 0, State5),
+
+ %% C2 should remain
+ ?assertMatch(#{C2 := #consumer{status = up}},
+ State6#rabbit_fifo.consumers),
+ %% C1 should be gone
+ ?assertNotMatch(#{C1 := _},
+ State6#rabbit_fifo.consumers),
+ ?assertMatch([], State6#rabbit_fifo.waiting_consumers),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.