summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-12-23 12:18:33 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-12-23 12:18:33 +0100
commitfc88f3ba21a2b932cb70dc3ffd28bcd501ee6ff1 (patch)
tree0d79b00e817b3f9b2f15e4fae03fbc0fbff6463c
parent7ce567962f189dad7aba9da4e7167ad79dd66c4a (diff)
downloadrabbitmq-server-git-stream-manager-check-queue-exists.tar.gz
Check that the queue exists before calling declarestream-manager-check-queue-exists
Rabbit channels are responsible of this check before calling declare, skipping it on the manager meant that the queue was partly redeclared and a new data directory created. The old one was still on disk with a different timestamp, but from the user point of view the queue data has been erased.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl39
1 files changed, 25 insertions, 14 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 1858617f5d..f25e31be32 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -119,20 +119,31 @@ handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State)
none, true, false, none, StreamQueueArguments,
VirtualHost, #{user => Username}, rabbit_stream_queue
),
- try
- case rabbit_stream_queue:declare(Q0, node()) of
- {new, Q} ->
- {reply, {ok, amqqueue:get_type_state(Q)}, State};
- {existing, _} ->
- {reply, {error, reference_already_exists}, State};
- {protocol_error, Type, Reason, Args} ->
- rabbit_log:warning("Error while creating ~p stream, ~p (~p)~n",
- [Reference, Type, rabbit_misc:format(Reason, Args)]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:Error ->
- rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
+ case rabbit_amqqueue:with(
+ Name,
+ fun(Q) ->
+ ok = rabbit_amqqueue:assert_equivalence(Q, true, false, StreamQueueArguments, none)
+ end) of
+ ok ->
+ {reply, {error, reference_already_exists}, State};
+ {error, not_found} ->
+ try
+ case rabbit_stream_queue:declare(Q0, node()) of
+ {new, Q} ->
+ {reply, {ok, amqqueue:get_type_state(Q)}, State};
+ {existing, _} ->
+ {reply, {error, reference_already_exists}, State};
+ {error, Err} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]),
+ {reply, {error, internal_error}, State}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
+ {reply, {error, internal_error}, State}
+ end;
+ {error, {absent, _, Reason}} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Reason]),
{reply, {error, internal_error}, State}
end;
error ->