diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-10-24 16:10:46 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-10-24 16:10:46 +0200 |
| commit | 1611df56b29b908af24ccc5fb4cf7b6fb20adde8 (patch) | |
| tree | 0a7404d55efcd3e588d4ca76080df388e7243e39 | |
| parent | bf7d62f9b89bea590f6f2f036c4a4b6f9440ed30 (diff) | |
| download | rabbitmq-server-git-1611df56b29b908af24ccc5fb4cf7b6fb20adde8.tar.gz | |
Check exclusive consumer isn't blocked before delivery
References #1743
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 32 |
2 files changed, 24 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c3aa3b179a..e539dad2a3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_amqqueue_process). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 604eaef7ce..f973ddcb73 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_queue_consumers). @@ -200,14 +200,24 @@ deliver(_FetchFun, _QName, false, State, true, none) -> {undelivered, false, State#state{use = update_use(State#state.use, inactive)}}; deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) -> - case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of - {delivered, R} -> - {delivered, false, R, State}; - undelivered -> - {ChPid, Consumer} = ExclusiveConsumer, - Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers), - {undelivered, true, - State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}} + {ChPid, Consumer} = ExclusiveConsumer, + %% blocked consumers are removed from the queue state, but not the exclusive_consumer field, + %% so we need to do this check to avoid adding the exclusive consumer to the channel record + %% over and over + case is_blocked(ExclusiveConsumer) of + true -> + {undelivered, false, + State#state{use = update_use(State#state.use, inactive)}}; + false -> + case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of + {delivered, R} -> + {delivered, false, R, State}; + undelivered -> + {ChPid, Consumer} = ExclusiveConsumer, + Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers), + {undelivered, true, + State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}} + end end; deliver(FetchFun, QName, ConsumersChanged, State = #state{consumers = Consumers}, false, _ExclusiveConsumer) -> @@ -263,6 +273,10 @@ deliver_to_consumer(FetchFun, unsent_message_count = Count + 1}), R. +is_blocked(Consumer = {ChPid, _C}) -> + #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid), + priority_queue:member(Consumer, BlockedConsumers). + record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), |
