summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2023-05-03 15:46:45 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2023-05-04 10:25:17 +0200
commitfd950aa93fbbba4fddffd0b6d648d17a3ff9eb61 (patch)
tree4ce953935c161c438585b1e040be0db2b1999b77
parenta16bf23b2525a584b53abe335e8d2ee854765626 (diff)
downloadrabbitmq-server-git-rabbitmq-server-7823-mandatory-flag-behavior.tar.gz
Return error if /amq/queue-type queue does not existrabbitmq-server-7823-mandatory-flag-behavior
When attaching an AMQP 1.0 link. References #7823
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl15
-rw-r--r--deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl45
2 files changed, 58 insertions, 2 deletions
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl
index 8b14d14884..635767aa46 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl
@@ -209,6 +209,7 @@ ensure_target(Target = #'v1_0.target'{address = Address,
dest, DCh, Dest, DeclareParams,
RouteState)
end),
+ maybe_ensure_queue(Dest, DCh),
{XName, RK} = rabbit_routing_util:parse_routing(Dest),
{ok, Target, Link#incoming_link{
route_state = RouteState1,
@@ -225,6 +226,20 @@ ensure_target(Target = #'v1_0.target'{address = Address,
{error, {address_not_utf8_string, Address}}
end.
+maybe_ensure_queue({amqqueue, Q}, Ch) ->
+ try
+ rabbit_amqp1_0_channel:convert_error(
+ fun () ->
+ Method = #'queue.declare'{queue = list_to_binary(Q),
+ passive = true},
+ amqp_channel:call(Ch, Method)
+ end)
+ catch exit:#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED} ->
+ ok
+ end;
+maybe_ensure_queue(_, _) ->
+ ok.
+
incoming_flow(#incoming_link{ delivery_count = Count }, Handle) ->
#'v1_0.flow'{handle = Handle,
delivery_count = {uint, Count},
diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
index 4b59fc6ca1..02ae3d0994 100644
--- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
@@ -25,6 +25,7 @@ groups() ->
{tests, [], [
reliable_send_receive_with_outcomes,
publishing_to_non_existing_queue_should_settle_with_released,
+ open_link_to_non_existing_destination_should_end_session,
roundtrip_classic_queue_with_drain,
roundtrip_quorum_queue_with_drain,
roundtrip_stream_queue_with_drain,
@@ -159,7 +160,7 @@ publishing_to_non_existing_queue_should_settle_with_released(Config) ->
QName = <<Container/binary, Suffix/binary>>,
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
- Address = <<"/amq/queue/", QName/binary>>,
+ Address = <<"/exchange/amq.direct/", QName/binary>>,
OpnConf = #{address => Host,
port => Port,
@@ -184,6 +185,36 @@ publishing_to_non_existing_queue_should_settle_with_released(Config) ->
flush("post sender close"),
ok.
+open_link_to_non_existing_destination_should_end_session(Config) ->
+ Container = atom_to_list(?FUNCTION_NAME),
+ Name = Container ++ "foo",
+ Addresses = [
+ "/exchange/" ++ Name ++ "/bar",
+ "/amq/queue/" ++ Name
+ ],
+ Host = ?config(rmq_hostname, Config),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ OpnConf = #{address => Host,
+ port => Port,
+ container_id => list_to_binary(Container),
+ sasl => {plain, <<"guest">>, <<"guest">>}},
+
+ [begin
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session(Connection),
+ SenderLinkName = <<"test-sender">>,
+ ct:pal("Address ~p", [Address]),
+ {ok, _} = amqp10_client:attach_sender_link(Session,
+ SenderLinkName,
+ list_to_binary(Address)),
+
+ wait_for_session_end(Session),
+ ok = amqp10_client:close_connection(Connection),
+ flush("post sender close")
+
+ end || Address <- Addresses],
+ ok.
+
roundtrip_classic_queue_with_drain(Config) ->
QName = atom_to_binary(?FUNCTION_NAME, utf8),
roundtrip_queue_with_drain(Config, <<"classic">>, QName).
@@ -403,6 +434,16 @@ wait_for_credit(Sender) ->
ct:fail(credited_timeout)
end.
+wait_for_session_end(Session) ->
+ receive
+ {amqp10_event, {session, Session, {ended, _}}} ->
+ flush(?FUNCTION_NAME),
+ ok
+ after 5000 ->
+ flush("wait_for_session_end timed out"),
+ ct:fail(settled_timeout)
+ end.
+
wait_for_settlement(Tag) ->
wait_for_settlement(Tag, accepted).
@@ -413,7 +454,7 @@ wait_for_settlement(Tag, State) ->
ok
after 5000 ->
flush("wait_for_settlement timed out"),
- ct:fail(credited_timeout)
+ ct:fail(settled_timeout)
end.
wait_for_accepts(0) -> ok;