diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-01-04 10:02:53 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-01-04 10:02:53 +0100 |
commit | 3035b18907154b99560fcad011f3e430535f6749 (patch) | |
tree | 9e0736536bc4bf76d549b5f6daf0e8edff9ca7f0 | |
parent | cbd3c8dfddd66930b43d85c8585fc87c9fb87215 (diff) | |
parent | 46f9409e305006767e42a810323415f21c8f7384 (diff) | |
download | rabbitmq-server-git-rabbitmq-stream-management.tar.gz |
Merge branch 'rabbitmq-stream-management' of github.com:rabbitmq/rabbitmq-server into rabbitmq-stream-managementrabbitmq-stream-management
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 1858617f5d..f25e31be32 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -119,20 +119,31 @@ handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) none, true, false, none, StreamQueueArguments, VirtualHost, #{user => Username}, rabbit_stream_queue ), - try - case rabbit_stream_queue:declare(Q0, node()) of - {new, Q} -> - {reply, {ok, amqqueue:get_type_state(Q)}, State}; - {existing, _} -> - {reply, {error, reference_already_exists}, State}; - {protocol_error, Type, Reason, Args} -> - rabbit_log:warning("Error while creating ~p stream, ~p (~p)~n", - [Reference, Type, rabbit_misc:format(Reason, Args)]), - {reply, {error, internal_error}, State} - end - catch - exit:Error -> - rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + case rabbit_amqqueue:with( + Name, + fun(Q) -> + ok = rabbit_amqqueue:assert_equivalence(Q, true, false, StreamQueueArguments, none) + end) of + ok -> + {reply, {error, reference_already_exists}, State}; + {error, not_found} -> + try + case rabbit_stream_queue:declare(Q0, node()) of + {new, Q} -> + {reply, {ok, amqqueue:get_type_state(Q)}, State}; + {existing, _} -> + {reply, {error, reference_already_exists}, State}; + {error, Err} -> + rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]), + {reply, {error, internal_error}, State} + end + catch + exit:Error -> + rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + {reply, {error, internal_error}, State} + end; + {error, {absent, _, Reason}} -> + rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Reason]), {reply, {error, internal_error}, State} end; error -> |