diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-12-23 12:18:33 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-12-23 12:18:33 +0100 |
commit | fc88f3ba21a2b932cb70dc3ffd28bcd501ee6ff1 (patch) | |
tree | 0d79b00e817b3f9b2f15e4fae03fbc0fbff6463c | |
parent | 7ce567962f189dad7aba9da4e7167ad79dd66c4a (diff) | |
download | rabbitmq-server-git-stream-manager-check-queue-exists.tar.gz |
Check that the queue exists before calling declarestream-manager-check-queue-exists
Rabbit channels are responsible of this check before calling declare,
skipping it on the manager meant that the queue was partly redeclared
and a new data directory created. The old one was still on disk with
a different timestamp, but from the user point of view the queue data
has been erased.
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 1858617f5d..f25e31be32 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -119,20 +119,31 @@ handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) none, true, false, none, StreamQueueArguments, VirtualHost, #{user => Username}, rabbit_stream_queue ), - try - case rabbit_stream_queue:declare(Q0, node()) of - {new, Q} -> - {reply, {ok, amqqueue:get_type_state(Q)}, State}; - {existing, _} -> - {reply, {error, reference_already_exists}, State}; - {protocol_error, Type, Reason, Args} -> - rabbit_log:warning("Error while creating ~p stream, ~p (~p)~n", - [Reference, Type, rabbit_misc:format(Reason, Args)]), - {reply, {error, internal_error}, State} - end - catch - exit:Error -> - rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + case rabbit_amqqueue:with( + Name, + fun(Q) -> + ok = rabbit_amqqueue:assert_equivalence(Q, true, false, StreamQueueArguments, none) + end) of + ok -> + {reply, {error, reference_already_exists}, State}; + {error, not_found} -> + try + case rabbit_stream_queue:declare(Q0, node()) of + {new, Q} -> + {reply, {ok, amqqueue:get_type_state(Q)}, State}; + {existing, _} -> + {reply, {error, reference_already_exists}, State}; + {error, Err} -> + rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]), + {reply, {error, internal_error}, State} + end + catch + exit:Error -> + rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + {reply, {error, internal_error}, State} + end; + {error, {absent, _, Reason}} -> + rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Reason]), {reply, {error, internal_error}, State} end; error -> |