summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-27 17:04:24 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-27 17:04:24 +0000
commit44928187312c9ffaaa272a4daa85211b4d400e34 (patch)
treec4919dfd992f6ff8641cd5b9c0201aa5a90fcdd0 /src
parent3b0adfda40edf59496ff9f6d994a11c27971a3f5 (diff)
downloadrabbitmq-server-git-44928187312c9ffaaa272a4daa85211b4d400e34.tar.gz
rabbit_fifo: cancel should not remove single active consumer
This change keeps a cancelled single active consumer in the consuemrs map but with the cancelled status allowing another consumer to take over as the active one. [#164135123]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl54
-rw-r--r--src/rabbit_quorum_queue.erl3
2 files changed, 34 insertions, 23 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 64f5b09bb4..9ddecb0824 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -784,6 +784,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{Consumer, Cons1} ->
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
Effects0, Reason),
+ %% The effects are emitted before the consumer is actually removed
+ %% if the consumer has unacked messages. This is a bit weird but
+ %% in line with what classic queues do (from an external point of
+ %% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
case maps:size(S#?MODULE.consumers) of
0 ->
@@ -796,32 +800,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
-activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects)
- when map_size(Cons) == 1 ->
- {State0, Effects};
-activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0,
+activate_next_consumer(#?MODULE{consumers = Cons,
+ waiting_consumers = Waiting0} = State0,
Effects0) ->
- case lists:filter(fun ({_, #consumer{status = Status}}) ->
- Status == up
- end, Waiting0) of
- [{NextConsumerId, NextConsumer} | _] ->
- Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
- #?MODULE{service_queue = ServiceQueue} = State0,
- ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
- NextConsumer,
- ServiceQueue),
- State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer},
- service_queue = ServiceQueue1,
- waiting_consumers = Remaining},
- Effects = consumer_update_active_effects(State, NextConsumerId,
- NextConsumer, true,
- single_active, Effects0),
- {State, Effects};
- [] ->
- {State0, Effects0}
+ case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
+ Up when map_size(Up) == 0 ->
+ %% there are no active consumer in the consumer map
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ %% there is a potential next active consumer
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?MODULE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, Effects0}
+ end;
+ _ ->
+ {State0, Effects0}
end.
+
maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
Cons1, #?MODULE{consumers = C0,
service_queue = SQ0} = S0,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e811bfffb3..28cfe9e558 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -190,7 +190,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act
QName, Prefetch, Active, ActivityStatus, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
+ [QName, ChPid, ConsumerTag]).
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),