diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2023-05-03 15:46:45 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2023-05-04 10:25:17 +0200 |
commit | fd950aa93fbbba4fddffd0b6d648d17a3ff9eb61 (patch) | |
tree | 4ce953935c161c438585b1e040be0db2b1999b77 | |
parent | a16bf23b2525a584b53abe335e8d2ee854765626 (diff) | |
download | rabbitmq-server-git-rabbitmq-server-7823-mandatory-flag-behavior.tar.gz |
Return error if /amq/queue-type queue does not existrabbitmq-server-7823-mandatory-flag-behavior
When attaching an AMQP 1.0 link.
References #7823
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl | 15 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl | 45 |
2 files changed, 58 insertions, 2 deletions
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl index 8b14d14884..635767aa46 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl @@ -209,6 +209,7 @@ ensure_target(Target = #'v1_0.target'{address = Address, dest, DCh, Dest, DeclareParams, RouteState) end), + maybe_ensure_queue(Dest, DCh), {XName, RK} = rabbit_routing_util:parse_routing(Dest), {ok, Target, Link#incoming_link{ route_state = RouteState1, @@ -225,6 +226,20 @@ ensure_target(Target = #'v1_0.target'{address = Address, {error, {address_not_utf8_string, Address}} end. +maybe_ensure_queue({amqqueue, Q}, Ch) -> + try + rabbit_amqp1_0_channel:convert_error( + fun () -> + Method = #'queue.declare'{queue = list_to_binary(Q), + passive = true}, + amqp_channel:call(Ch, Method) + end) + catch exit:#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED} -> + ok + end; +maybe_ensure_queue(_, _) -> + ok. + incoming_flow(#incoming_link{ delivery_count = Count }, Handle) -> #'v1_0.flow'{handle = Handle, delivery_count = {uint, Count}, diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 4b59fc6ca1..02ae3d0994 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -25,6 +25,7 @@ groups() -> {tests, [], [ reliable_send_receive_with_outcomes, publishing_to_non_existing_queue_should_settle_with_released, + open_link_to_non_existing_destination_should_end_session, roundtrip_classic_queue_with_drain, roundtrip_quorum_queue_with_drain, roundtrip_stream_queue_with_drain, @@ -159,7 +160,7 @@ publishing_to_non_existing_queue_should_settle_with_released(Config) -> QName = <<Container/binary, Suffix/binary>>, Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - Address = <<"/amq/queue/", QName/binary>>, + Address = <<"/exchange/amq.direct/", QName/binary>>, OpnConf = #{address => Host, port => Port, @@ -184,6 +185,36 @@ publishing_to_non_existing_queue_should_settle_with_released(Config) -> flush("post sender close"), ok. +open_link_to_non_existing_destination_should_end_session(Config) -> + Container = atom_to_list(?FUNCTION_NAME), + Name = Container ++ "foo", + Addresses = [ + "/exchange/" ++ Name ++ "/bar", + "/amq/queue/" ++ Name + ], + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpnConf = #{address => Host, + port => Port, + container_id => list_to_binary(Container), + sasl => {plain, <<"guest">>, <<"guest">>}}, + + [begin + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + SenderLinkName = <<"test-sender">>, + ct:pal("Address ~p", [Address]), + {ok, _} = amqp10_client:attach_sender_link(Session, + SenderLinkName, + list_to_binary(Address)), + + wait_for_session_end(Session), + ok = amqp10_client:close_connection(Connection), + flush("post sender close") + + end || Address <- Addresses], + ok. + roundtrip_classic_queue_with_drain(Config) -> QName = atom_to_binary(?FUNCTION_NAME, utf8), roundtrip_queue_with_drain(Config, <<"classic">>, QName). @@ -403,6 +434,16 @@ wait_for_credit(Sender) -> ct:fail(credited_timeout) end. +wait_for_session_end(Session) -> + receive + {amqp10_event, {session, Session, {ended, _}}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_session_end timed out"), + ct:fail(settled_timeout) + end. + wait_for_settlement(Tag) -> wait_for_settlement(Tag, accepted). @@ -413,7 +454,7 @@ wait_for_settlement(Tag, State) -> ok after 5000 -> flush("wait_for_settlement timed out"), - ct:fail(credited_timeout) + ct:fail(settled_timeout) end. wait_for_accepts(0) -> ok; |