summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-10-24 11:27:16 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-10-24 11:27:16 +0200
commitbf7d62f9b89bea590f6f2f036c4a4b6f9440ed30 (patch)
treeeb2e2db01ae9e8957af1127b07846f957c25ee98
parentec3f83f7134fca4e42422ed21740dcfec395cc46 (diff)
downloadrabbitmq-server-git-bf7d62f9b89bea590f6f2f036c4a4b6f9440ed30.tar.gz
Forbid AMQP exclusive consume in exclusive consumer queue
[#161090309] References #1743
-rw-r--r--src/rabbit_amqqueue_process.erl47
-rw-r--r--test/exclusive_consumer_SUITE.erl31
2 files changed, 47 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4d06ff28af..c3aa3b179a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1234,11 +1234,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder,
exclusive_consumer_on = ExclusiveConsumerOn}) ->
- State1 = case ExclusiveConsumerOn of
+ ConsumerRegistration = case ExclusiveConsumerOn of
true ->
case ExclusiveConsume of
true ->
- reply({error, exclusive_consume_unavailable}, State);
+ {error, reply({error, exclusive_consume_unavailable}, State)};
false ->
Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
@@ -1249,17 +1249,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
case Holder of
none ->
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
- State#q{consumers = Consumers1,
- has_had_consumers = true,
- exclusive_consumer = NewConsumer};
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ exclusive_consumer = NewConsumer}};
_ ->
- State#q{consumers = Consumers1,
- has_had_consumers = true}
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true}}
end
end;
false ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use -> reply({error, exclusive_consume_unavailable}, State);
+ in_use -> {error, reply({error, exclusive_consume_unavailable}, State)};
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
@@ -1269,22 +1269,27 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
- State#q{consumers = Consumers1,
+ {state, State#q{consumers = Consumers1,
has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer}
+ exclusive_consumer = ExclusiveConsumer}}
end
end,
- ok = maybe_send_reply(ChPid, OkMsg),
- QName = qname(State1),
- AckRequired = not NoAck,
- rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, Args),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, PrefetchCount,
- Args, none, ActingUser),
- notify_decorators(State1),
- reply(ok, run_message_queue(State1));
+ case ConsumerRegistration of
+ {error, Reply} ->
+ Reply;
+ {state, State1} ->
+ ok = maybe_send_reply(ChPid, OkMsg),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, PrefetchCount,
+ Args, none, ActingUser),
+ notify_decorators(State1),
+ reply(ok, run_message_queue(State1))
+ end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
State = #q{consumers = Consumers,
diff --git a/test/exclusive_consumer_SUITE.erl b/test/exclusive_consumer_SUITE.erl
index 8441fb6842..9aaf3df78e 100644
--- a/test/exclusive_consumer_SUITE.erl
+++ b/test/exclusive_consumer_SUITE.erl
@@ -32,7 +32,8 @@ groups() ->
{default, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
- fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ amqp_exclusive_consume_fails_on_exclusive_consumer_queue
]}
].
@@ -58,10 +59,8 @@ end_per_testcase(Testcase, Config) ->
all_messages_go_to_one_consumer(Config) ->
{C, Ch} = connection_and_channel(Config),
- Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}],
- auto_delete = true},
+ Q = queue_declare(Ch),
NbMessages = 5,
- #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare),
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
@@ -86,10 +85,8 @@ all_messages_go_to_one_consumer(Config) ->
fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
- Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}],
- auto_delete = true},
+ Q = queue_declare(Ch),
NbMessages = 10,
- #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare),
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
@@ -130,10 +127,8 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
{C1, Ch1} = connection_and_channel(Config),
{C2, Ch2} = connection_and_channel(Config),
{C3, Ch3} = connection_and_channel(Config),
- Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}],
- auto_delete = true},
+ Q = queue_declare(Ch),
NbMessages = 10,
- #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch1, Declare),
Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]),
Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]),
Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]),
@@ -171,11 +166,27 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
[amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
ok.
+amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 403, _}}, _},
+ amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
+ ),
+ amqp_connection:close(C),
+ ok.
+
connection_and_channel(Config) ->
C = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
{ok, Ch} = amqp_connection:open_channel(C),
{C, Ch}.
+queue_declare(Channel) ->
+ Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}],
+ auto_delete = true},
+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
+ Q.
+
consume({Parent, State, 0}) ->
Parent ! {consumer_done, State};
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->