diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 3 |
2 files changed, 34 insertions, 23 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 64f5b09bb4..9ddecb0824 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -784,6 +784,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {Consumer, Cons1} -> {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + %% The effects are emitted before the consumer is actually removed + %% if the consumer has unacked messages. This is a bit weird but + %% in line with what classic queues do (from an external point of + %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), case maps:size(S#?MODULE.consumers) of 0 -> @@ -796,32 +800,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. -activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects) - when map_size(Cons) == 1 -> - {State0, Effects}; -activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0, +activate_next_consumer(#?MODULE{consumers = Cons, + waiting_consumers = Waiting0} = State0, Effects0) -> - case lists:filter(fun ({_, #consumer{status = Status}}) -> - Status == up - end, Waiting0) of - [{NextConsumerId, NextConsumer} | _] -> - Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), - #?MODULE{service_queue = ServiceQueue} = State0, - ServiceQueue1 = maybe_queue_consumer(NextConsumerId, - NextConsumer, - ServiceQueue), - State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer}, - service_queue = ServiceQueue1, - waiting_consumers = Remaining}, - Effects = consumer_update_active_effects(State, NextConsumerId, - NextConsumer, true, - single_active, Effects0), - {State, Effects}; - [] -> - {State0, Effects0} + case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of + Up when map_size(Up) == 0 -> + %% there are no active consumer in the consumer map + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + %% there is a potential next active consumer + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, Effects0} + end; + _ -> + {State0, Effects0} end. + maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, #?MODULE{consumers = C0, service_queue = SQ0} = S0, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e811bfffb3..28cfe9e558 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -190,7 +190,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act QName, Prefetch, Active, ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). + local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, + [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), |
