diff options
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 1858617f5d..f25e31be32 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -119,20 +119,31 @@ handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) none, true, false, none, StreamQueueArguments, VirtualHost, #{user => Username}, rabbit_stream_queue ), - try - case rabbit_stream_queue:declare(Q0, node()) of - {new, Q} -> - {reply, {ok, amqqueue:get_type_state(Q)}, State}; - {existing, _} -> - {reply, {error, reference_already_exists}, State}; - {protocol_error, Type, Reason, Args} -> - rabbit_log:warning("Error while creating ~p stream, ~p (~p)~n", - [Reference, Type, rabbit_misc:format(Reason, Args)]), - {reply, {error, internal_error}, State} - end - catch - exit:Error -> - rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + case rabbit_amqqueue:with( + Name, + fun(Q) -> + ok = rabbit_amqqueue:assert_equivalence(Q, true, false, StreamQueueArguments, none) + end) of + ok -> + {reply, {error, reference_already_exists}, State}; + {error, not_found} -> + try + case rabbit_stream_queue:declare(Q0, node()) of + {new, Q} -> + {reply, {ok, amqqueue:get_type_state(Q)}, State}; + {existing, _} -> + {reply, {error, reference_already_exists}, State}; + {error, Err} -> + rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]), + {reply, {error, internal_error}, State} + end + catch + exit:Error -> + rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + {reply, {error, internal_error}, State} + end; + {error, {absent, _, Reason}} -> + rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Reason]), {reply, {error, internal_error}, State} end; error -> |