diff options
author | Ben Buchwald <bbuchwald@boxfort.com> | 2020-02-26 10:04:11 -0500 |
---|---|---|
committer | Luke Bakken <lbakken@pivotal.io> | 2020-11-17 09:07:41 -0800 |
commit | e35724c61e425934144655307d61bf38025d40b5 (patch) | |
tree | 06f1f68b3735d3117f25911e105c1c56313aa59e | |
parent | 778e8dad5ceb9fec974d7ceea04081cc2e35872b (diff) | |
download | rabbitmq-server-git-rabbitmq-erlang-client-gh-131-monorepo.tar.gz |
Allow declaring exchanges from STOMPrabbitmq-erlang-client-gh-131-monorepo
The STOMP plugin uses rabbit_routing_util:ensure_endpoint to declare a queue
based on a destination path. However, if that path is an exchange, currently
it will not declare the exchange, it must already exist. With this change, if
a declare_exchange parameter is passed and set to true, the destination
exchange will be declared. Additional parameters may be passed to specify
exchange properties: type, durable, auto_delete, and arguments. All of these
must be prefixed with exchange_ to distinguish them from parameters that
specify similarly named properties of the queue the exchange will be bound to.
(cherry picked from commit b51472244a41666762d65b8ebd797a8a382b7772)
-rw-r--r-- | deps/amqp_client/src/rabbit_routing_util.erl | 62 |
1 files changed, 51 insertions, 11 deletions
diff --git a/deps/amqp_client/src/rabbit_routing_util.erl b/deps/amqp_client/src/rabbit_routing_util.erl index 9d64a1468e..7396de7faa 100644 --- a/deps/amqp_client/src/rabbit_routing_util.erl +++ b/deps/amqp_client/src/rabbit_routing_util.erl @@ -74,8 +74,7 @@ ensure_endpoint(Dir, Channel, Endpoint, State) -> ensure_endpoint(Dir, Channel, Endpoint, [], State). ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel, - proplists:get_value(check_exchange, Params, false)), + declare_exchange(Name, Channel, Params), Method = queue_declare_method(#'queue.declare'{}, exchange, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; @@ -106,8 +105,7 @@ ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> {ok, Queue, State1}; ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel, - proplists:get_value(check_exchange, Params, false)), + declare_exchange(Name, Channel, Params), {ok, undefined, State}; ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> @@ -155,13 +153,55 @@ dest_temp_queue(_) -> none. %% -------------------------------------------------------------------------- -check_exchange(_, _, false) -> - ok; -check_exchange(ExchangeName, Channel, true) -> - XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName), - passive = true }, - #'exchange.declare_ok'{} = amqp_channel:call(Channel, XDecl), - ok. +update_exchange_declare_type(Method, Params) -> + case proplists:get_value(exchange_type, Params) of + undefined -> Method; + Val -> Method#'exchange.declare'{type = Val} + end. + +update_exchange_declare_durable(Method, Params) -> + case proplists:get_value(exchange_durable, Params) of + undefined -> Method; + Val -> Method#'exchange.declare'{durable = Val} + end. + +update_exchange_declare_auto_delete(Method, Params) -> + case proplists:get_value(exchange_auto_delete, Params) of + undefined -> Method; + Val -> Method#'exchange.declare'{auto_delete = Val} + end. + +update_exchange_declare_arguments(Method, Params) -> + Method#'exchange.declare'{arguments = + proplists:get_value(exchange_arguments, Params, [])}. + +exchange_declare_method(#'exchange.declare'{} = Method, Params) -> + %% set exchange.declare fields from Params + lists:foldl(fun (F, Acc) -> F(Acc, Params) end, + Method, [fun update_exchange_declare_type/2, + fun update_exchange_declare_durable/2, + fun update_exchange_declare_auto_delete/2, + fun update_exchange_declare_arguments/2]). + +declare_exchange(ExchangeName, Channel, Params) -> + Method = case proplists:get_value(declare_exchange, Params, false) of + true -> exchange_declare_method( + #'exchange.declare'{ exchange = list_to_binary(ExchangeName) }, + Params); + false -> + case proplists:get_value(check_exchange, Params, false) of + true -> #'exchange.declare'{ + exchange = list_to_binary(ExchangeName), + passive = true }; + false -> none + end + end, + + case Method of + none -> ok; + _ -> #'exchange.declare_ok'{} = amqp_channel:call(Channel, Method), + ok + end. update_queue_declare_arguments(Method, Params) -> Method#'queue.declare'{arguments = |