diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-27 17:04:24 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-27 17:04:24 +0000 |
| commit | 44928187312c9ffaaa272a4daa85211b4d400e34 (patch) | |
| tree | c4919dfd992f6ff8641cd5b9c0201aa5a90fcdd0 | |
| parent | 3b0adfda40edf59496ff9f6d994a11c27971a3f5 (diff) | |
| download | rabbitmq-server-git-44928187312c9ffaaa272a4daa85211b4d400e34.tar.gz | |
rabbit_fifo: cancel should not remove single active consumer
This change keeps a cancelled single active consumer in the consuemrs
map but with the cancelled status allowing another consumer to take over
as the active one.
[#164135123]
| -rw-r--r-- | src/rabbit_fifo.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 3 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 55 |
3 files changed, 88 insertions, 24 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 64f5b09bb4..9ddecb0824 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -784,6 +784,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {Consumer, Cons1} -> {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + %% The effects are emitted before the consumer is actually removed + %% if the consumer has unacked messages. This is a bit weird but + %% in line with what classic queues do (from an external point of + %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), case maps:size(S#?MODULE.consumers) of 0 -> @@ -796,32 +800,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. -activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects) - when map_size(Cons) == 1 -> - {State0, Effects}; -activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0, +activate_next_consumer(#?MODULE{consumers = Cons, + waiting_consumers = Waiting0} = State0, Effects0) -> - case lists:filter(fun ({_, #consumer{status = Status}}) -> - Status == up - end, Waiting0) of - [{NextConsumerId, NextConsumer} | _] -> - Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), - #?MODULE{service_queue = ServiceQueue} = State0, - ServiceQueue1 = maybe_queue_consumer(NextConsumerId, - NextConsumer, - ServiceQueue), - State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer}, - service_queue = ServiceQueue1, - waiting_consumers = Remaining}, - Effects = consumer_update_active_effects(State, NextConsumerId, - NextConsumer, true, - single_active, Effects0), - {State, Effects}; - [] -> - {State0, Effects0} + case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of + Up when map_size(Up) == 0 -> + %% there are no active consumer in the consumer map + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + %% there is a potential next active consumer + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, Effects0} + end; + _ -> + {State0, Effects0} end. + maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, #?MODULE{consumers = C0, service_queue = SQ0} = S0, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e811bfffb3..28cfe9e558 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -190,7 +190,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act QName, Prefetch, Active, ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). + local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, + [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 6cc167b050..a45d423e69 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -1029,9 +1029,62 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID - ct:pal("Effects3 ~w", [Effects3]), ?assertEqual(5, length(Effects3)). +single_active_cancelled_with_unacked_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 1, simple_prefetch}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% enqueue 2 messages + {State2, _Effects2} = enq(3, 1, msg1, State1), + {State3, _Effects3} = enq(4, 2, msg2, State2), + %% one should be checked ou to C1 + %% cancel C1 + {State4, _, _} = apply(meta(5), + make_checkout(C1, cancel, #{}), + State3), + %% C2 should be the active consumer + ?assertMatch(#{C2 := #consumer{status = up, + checked_out = #{0 := _}}}, + State4#rabbit_fifo.consumers), + %% C1 should be a cancelled consumer + ?assertMatch(#{C1 := #consumer{status = cancelled, + lifetime = once, + checked_out = #{0 := _}}}, + State4#rabbit_fifo.consumers), + ?assertMatch([], State4#rabbit_fifo.waiting_consumers), + + %% Ack both messages + {State5, _Effects5} = settle(C1, 1, 0, State4), + %% C1 should now be cancelled + {State6, _Effects6} = settle(C2, 2, 0, State5), + + %% C2 should remain + ?assertMatch(#{C2 := #consumer{status = up}}, + State6#rabbit_fifo.consumers), + %% C1 should be gone + ?assertNotMatch(#{C1 := _}, + State6#rabbit_fifo.consumers), + ?assertMatch([], State6#rabbit_fifo.waiting_consumers), + ok. + meta(Idx) -> #{index => Idx, term => 1}. |
