summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Buchwald <bbuchwald@boxfort.com>2020-02-26 10:04:11 -0500
committerLuke Bakken <lbakken@pivotal.io>2020-11-17 09:07:41 -0800
commite35724c61e425934144655307d61bf38025d40b5 (patch)
tree06f1f68b3735d3117f25911e105c1c56313aa59e
parent778e8dad5ceb9fec974d7ceea04081cc2e35872b (diff)
downloadrabbitmq-server-git-rabbitmq-erlang-client-gh-131-monorepo.tar.gz
Allow declaring exchanges from STOMPrabbitmq-erlang-client-gh-131-monorepo
The STOMP plugin uses rabbit_routing_util:ensure_endpoint to declare a queue based on a destination path. However, if that path is an exchange, currently it will not declare the exchange, it must already exist. With this change, if a declare_exchange parameter is passed and set to true, the destination exchange will be declared. Additional parameters may be passed to specify exchange properties: type, durable, auto_delete, and arguments. All of these must be prefixed with exchange_ to distinguish them from parameters that specify similarly named properties of the queue the exchange will be bound to. (cherry picked from commit b51472244a41666762d65b8ebd797a8a382b7772)
-rw-r--r--deps/amqp_client/src/rabbit_routing_util.erl62
1 files changed, 51 insertions, 11 deletions
diff --git a/deps/amqp_client/src/rabbit_routing_util.erl b/deps/amqp_client/src/rabbit_routing_util.erl
index 9d64a1468e..7396de7faa 100644
--- a/deps/amqp_client/src/rabbit_routing_util.erl
+++ b/deps/amqp_client/src/rabbit_routing_util.erl
@@ -74,8 +74,7 @@ ensure_endpoint(Dir, Channel, Endpoint, State) ->
ensure_endpoint(Dir, Channel, Endpoint, [], State).
ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) ->
- check_exchange(Name, Channel,
- proplists:get_value(check_exchange, Params, false)),
+ declare_exchange(Name, Channel, Params),
Method = queue_declare_method(#'queue.declare'{}, exchange, Params),
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method),
{ok, Queue, State};
@@ -106,8 +105,7 @@ ensure_endpoint(_, Channel, {queue, Name}, Params, State) ->
{ok, Queue, State1};
ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) ->
- check_exchange(Name, Channel,
- proplists:get_value(check_exchange, Params, false)),
+ declare_exchange(Name, Channel, Params),
{ok, undefined, State};
ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) ->
@@ -155,13 +153,55 @@ dest_temp_queue(_) -> none.
%% --------------------------------------------------------------------------
-check_exchange(_, _, false) ->
- ok;
-check_exchange(ExchangeName, Channel, true) ->
- XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName),
- passive = true },
- #'exchange.declare_ok'{} = amqp_channel:call(Channel, XDecl),
- ok.
+update_exchange_declare_type(Method, Params) ->
+ case proplists:get_value(exchange_type, Params) of
+ undefined -> Method;
+ Val -> Method#'exchange.declare'{type = Val}
+ end.
+
+update_exchange_declare_durable(Method, Params) ->
+ case proplists:get_value(exchange_durable, Params) of
+ undefined -> Method;
+ Val -> Method#'exchange.declare'{durable = Val}
+ end.
+
+update_exchange_declare_auto_delete(Method, Params) ->
+ case proplists:get_value(exchange_auto_delete, Params) of
+ undefined -> Method;
+ Val -> Method#'exchange.declare'{auto_delete = Val}
+ end.
+
+update_exchange_declare_arguments(Method, Params) ->
+ Method#'exchange.declare'{arguments =
+ proplists:get_value(exchange_arguments, Params, [])}.
+
+exchange_declare_method(#'exchange.declare'{} = Method, Params) ->
+ %% set exchange.declare fields from Params
+ lists:foldl(fun (F, Acc) -> F(Acc, Params) end,
+ Method, [fun update_exchange_declare_type/2,
+ fun update_exchange_declare_durable/2,
+ fun update_exchange_declare_auto_delete/2,
+ fun update_exchange_declare_arguments/2]).
+
+declare_exchange(ExchangeName, Channel, Params) ->
+ Method = case proplists:get_value(declare_exchange, Params, false) of
+ true -> exchange_declare_method(
+ #'exchange.declare'{ exchange = list_to_binary(ExchangeName) },
+ Params);
+ false ->
+ case proplists:get_value(check_exchange, Params, false) of
+ true -> #'exchange.declare'{
+ exchange = list_to_binary(ExchangeName),
+ passive = true };
+ false -> none
+ end
+ end,
+
+ case Method of
+ none -> ok;
+ _ -> #'exchange.declare_ok'{} = amqp_channel:call(Channel, Method),
+ ok
+ end.
update_queue_declare_arguments(Method, Params) ->
Method#'queue.declare'{arguments =