summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2019-11-20 10:24:12 +0000
committerdcorbacho <dparracorbacho@piotal.io>2019-11-20 13:29:23 +0000
commitcb4989f31d56a072d7b12cf11f08431701e5a4e2 (patch)
treefe7fe5b8ee98c66c9a8c81ab1cc0dc80a552b7d6
parente5f2dfafce3b5c75b11fb76b7ddf464c5358ad41 (diff)
downloadrabbitmq-server-git-cb4989f31d56a072d7b12cf11f08431701e5a4e2.tar.gz
Reject basic.get operations on quorum queues with single active consumer enabled
rabbitmq-server/2164 [#169810347]
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--src/rabbit_fifo_client.erl2
-rw-r--r--src/rabbit_quorum_queue.erl5
-rw-r--r--test/rabbit_fifo_SUITE.erl15
-rw-r--r--test/single_active_consumer_SUITE.erl14
5 files changed, 38 insertions, 1 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 68337adcaa..0f77c2726f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -263,6 +263,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
+apply(_, #checkout{spec = {dequeue, _}},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
+ {State0, {error, unsupported}};
apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
meta = ConsumerMeta,
consumer_id = ConsumerId},
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c85dd7266a..c6af9bf10d 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -199,6 +199,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, empty, State0#state{leader = Leader}};
{ok, {dequeue, Msg, NumReady}, Leader} ->
{ok, {Msg, NumReady}, State0#state{leader = Leader}};
+ {ok, {error, _} = Err, _Leader} ->
+ Err;
Err ->
Err
end.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 75857f81a8..d15e17b00b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -482,6 +482,11 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
+ {error, unsupported} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain access to locked ~s. basic.get operations are not supported by quorum queues with single active consumer",
+ [rabbit_misc:rs(QName)]);
{error, _} = Err ->
Err;
{timeout, _} ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 1714d57932..2de6a86e84 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -640,6 +640,21 @@ down_noconnection_returns_checked_out_test(_) ->
?assertEqual(lists:sort(Returns), Returns),
ok.
+single_active_consumer_basic_get_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#rabbit_fifo.consumers)),
+ {State1, _} = enq(1, 1, first, State0),
+ {_State, {error, unsupported}} =
+ apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State1),
+ ok.
+
single_active_consumer_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 09d0886bf6..128d880eb6 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -40,7 +40,8 @@ groups() ->
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
- fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks,
+ basic_get_is_unsupported
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
@@ -267,6 +268,17 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
[amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
ok.
+basic_get_is_unsupported(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 405, _}}, _},
+ amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false})),
+
+ amqp_connection:close(C),
+ ok.
+
amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),