diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-10-24 11:27:16 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-10-24 11:27:16 +0200 |
| commit | bf7d62f9b89bea590f6f2f036c4a4b6f9440ed30 (patch) | |
| tree | eb2e2db01ae9e8957af1127b07846f957c25ee98 | |
| parent | ec3f83f7134fca4e42422ed21740dcfec395cc46 (diff) | |
| download | rabbitmq-server-git-bf7d62f9b89bea590f6f2f036c4a4b6f9440ed30.tar.gz | |
Forbid AMQP exclusive consume in exclusive consumer queue
[#161090309]
References #1743
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 | ||||
| -rw-r--r-- | test/exclusive_consumer_SUITE.erl | 31 |
2 files changed, 47 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4d06ff28af..c3aa3b179a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1234,11 +1234,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder, exclusive_consumer_on = ExclusiveConsumerOn}) -> - State1 = case ExclusiveConsumerOn of + ConsumerRegistration = case ExclusiveConsumerOn of true -> case ExclusiveConsume of true -> - reply({error, exclusive_consume_unavailable}, State); + {error, reply({error, exclusive_consume_unavailable}, State)}; false -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, @@ -1249,17 +1249,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, case Holder of none -> NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), - State#q{consumers = Consumers1, - has_had_consumers = true, - exclusive_consumer = NewConsumer}; + {state, State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = NewConsumer}}; _ -> - State#q{consumers = Consumers1, - has_had_consumers = true} + {state, State#q{consumers = Consumers1, + has_had_consumers = true}} end end; false -> case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> reply({error, exclusive_consume_unavailable}, State); + in_use -> {error, reply({error, exclusive_consume_unavailable}, State)}; ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, @@ -1269,22 +1269,27 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder end, - State#q{consumers = Consumers1, + {state, State#q{consumers = Consumers1, has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer} + exclusive_consumer = ExclusiveConsumer}} end end, - ok = maybe_send_reply(ChPid, OkMsg), - QName = qname(State1), - AckRequired = not NoAck, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, Args), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, PrefetchCount, - Args, none, ActingUser), - notify_decorators(State1), - reply(ok, run_message_queue(State1)); + case ConsumerRegistration of + {error, Reply} -> + Reply; + {state, State1} -> + ok = maybe_send_reply(ChPid, OkMsg), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, PrefetchCount, + Args, none, ActingUser), + notify_decorators(State1), + reply(ok, run_message_queue(State1)) + end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, State = #q{consumers = Consumers, diff --git a/test/exclusive_consumer_SUITE.erl b/test/exclusive_consumer_SUITE.erl index 8441fb6842..9aaf3df78e 100644 --- a/test/exclusive_consumer_SUITE.erl +++ b/test/exclusive_consumer_SUITE.erl @@ -32,7 +32,8 @@ groups() -> {default, [], [ all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, - fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + amqp_exclusive_consume_fails_on_exclusive_consumer_queue ]} ]. @@ -58,10 +59,8 @@ end_per_testcase(Testcase, Config) -> all_messages_go_to_one_consumer(Config) -> {C, Ch} = connection_and_channel(Config), - Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}], - auto_delete = true}, + Q = queue_declare(Ch), NbMessages = 5, - #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare), ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), #'basic.consume_ok'{consumer_tag = CTag1} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), @@ -86,10 +85,8 @@ all_messages_go_to_one_consumer(Config) -> fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), - Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}], - auto_delete = true}, + Q = queue_declare(Ch), NbMessages = 10, - #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare), ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), #'basic.consume_ok'{consumer_tag = CTag1} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), @@ -130,10 +127,8 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config {C1, Ch1} = connection_and_channel(Config), {C2, Ch2} = connection_and_channel(Config), {C3, Ch3} = connection_and_channel(Config), - Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}], - auto_delete = true}, + Q = queue_declare(Ch), NbMessages = 10, - #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch1, Declare), Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]), Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]), Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]), @@ -171,11 +166,27 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]], ok. +amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch), + ?assertExit( + {{shutdown, {server_initiated_close, 403, _}}, _}, + amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true}) + ), + amqp_connection:close(C), + ok. + connection_and_channel(Config) -> C = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Ch} = amqp_connection:open_channel(C), {C, Ch}. +queue_declare(Channel) -> + Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}], + auto_delete = true}, + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare), + Q. + consume({Parent, State, 0}) -> Parent ! {consumer_done, State}; consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) -> |
