diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 5 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 15 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 14 |
5 files changed, 38 insertions, 1 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 68337adcaa..0f77c2726f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -263,6 +263,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% credit for unknown consumer - just ignore {State0, ok} end; +apply(_, #checkout{spec = {dequeue, _}}, + #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> + {State0, {error, unsupported}}; apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, meta = ConsumerMeta, consumer_id = ConsumerId}, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c85dd7266a..c6af9bf10d 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -199,6 +199,8 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> {ok, empty, State0#state{leader = Leader}}; {ok, {dequeue, Msg, NumReady}, Leader} -> {ok, {Msg, NumReady}, State0#state{leader = Leader}}; + {ok, {error, _} = Err, _Leader} -> + Err; Err -> Err end. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 75857f81a8..d15e17b00b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -482,6 +482,11 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) -> IsDelivered = Count > 0, Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState}; + {error, unsupported} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain access to locked ~s. basic.get operations are not supported by quorum queues with single active consumer", + [rabbit_misc:rs(QName)]); {error, _} = Err -> Err; {timeout, _} -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 1714d57932..2de6a86e84 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -640,6 +640,21 @@ down_noconnection_returns_checked_out_test(_) -> ?assertEqual(lists:sort(Returns), Returns), ok. +single_active_consumer_basic_get_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#rabbit_fifo.consumers)), + {State1, _} = enq(1, 1, first, State0), + {_State, {error, unsupported}} = + apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), + State1), + ok. + single_active_consumer_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 09d0886bf6..128d880eb6 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -40,7 +40,8 @@ groups() -> all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, - fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks, + basic_get_is_unsupported %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ ]} ]. @@ -267,6 +268,17 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]], ok. +basic_get_is_unsupported(Config) -> + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 405, _}}, _}, + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false})), + + amqp_connection:close(C), + ok. + amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) -> {C, Ch} = connection_and_channel(Config), Q = queue_declare(Ch, Config), |
