summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_queue_consumers.erl32
2 files changed, 24 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c3aa3b179a..e539dad2a3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 604eaef7ce..f973ddcb73 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_consumers).
@@ -200,14 +200,24 @@ deliver(_FetchFun, _QName, false, State, true, none) ->
{undelivered, false,
State#state{use = update_use(State#state.use, inactive)}};
deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) ->
- case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of
- {delivered, R} ->
- {delivered, false, R, State};
- undelivered ->
- {ChPid, Consumer} = ExclusiveConsumer,
- Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
- {undelivered, true,
- State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
+ {ChPid, Consumer} = ExclusiveConsumer,
+ %% blocked consumers are removed from the queue state, but not the exclusive_consumer field,
+ %% so we need to do this check to avoid adding the exclusive consumer to the channel record
+ %% over and over
+ case is_blocked(ExclusiveConsumer) of
+ true ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+ false ->
+ case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of
+ {delivered, R} ->
+ {delivered, false, R, State};
+ undelivered ->
+ {ChPid, Consumer} = ExclusiveConsumer,
+ Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
+ {undelivered, true,
+ State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
+ end
end;
deliver(FetchFun, QName, ConsumersChanged,
State = #state{consumers = Consumers}, false, _ExclusiveConsumer) ->
@@ -263,6 +273,10 @@ deliver_to_consumer(FetchFun,
unsent_message_count = Count + 1}),
R.
+is_blocked(Consumer = {ChPid, _C}) ->
+ #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid),
+ priority_queue:member(Consumer, BlockedConsumers).
+
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),