summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl39
1 files changed, 25 insertions, 14 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 1858617f5d..f25e31be32 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -119,20 +119,31 @@ handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State)
none, true, false, none, StreamQueueArguments,
VirtualHost, #{user => Username}, rabbit_stream_queue
),
- try
- case rabbit_stream_queue:declare(Q0, node()) of
- {new, Q} ->
- {reply, {ok, amqqueue:get_type_state(Q)}, State};
- {existing, _} ->
- {reply, {error, reference_already_exists}, State};
- {protocol_error, Type, Reason, Args} ->
- rabbit_log:warning("Error while creating ~p stream, ~p (~p)~n",
- [Reference, Type, rabbit_misc:format(Reason, Args)]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:Error ->
- rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
+ case rabbit_amqqueue:with(
+ Name,
+ fun(Q) ->
+ ok = rabbit_amqqueue:assert_equivalence(Q, true, false, StreamQueueArguments, none)
+ end) of
+ ok ->
+ {reply, {error, reference_already_exists}, State};
+ {error, not_found} ->
+ try
+ case rabbit_stream_queue:declare(Q0, node()) of
+ {new, Q} ->
+ {reply, {ok, amqqueue:get_type_state(Q)}, State};
+ {existing, _} ->
+ {reply, {error, reference_already_exists}, State};
+ {error, Err} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]),
+ {reply, {error, internal_error}, State}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
+ {reply, {error, internal_error}, State}
+ end;
+ {error, {absent, _, Reason}} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Reason]),
{reply, {error, internal_error}, State}
end;
error ->