summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-12-17 14:41:24 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2021-12-17 14:41:24 +0000
commitd72719a1ce20b2689251e38ee357f43fabcde43c (patch)
tree708f4c57b701b569e1246d17f9405b35eca51869
parent249e8c853c08dc7e7011e1a24bff4dfe1a7175db (diff)
downloadrabbitmq-server-git-stream-coord-mnesia-update-crash.tar.gz
Stream coordinator: avoid mnesia update process crashing after deletestream-coord-mnesia-update-crash
If a delete happens shortly after a declare or other stream change there is a chance the mnesia update process that is spawned will crash when the amqqueue record cannot be recovered from durable storage. This isn't harmful but does pollute the logs.
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl26
1 files changed, 15 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 5c23fbb51d..f88822a54f 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -846,18 +846,22 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
%% This can happen during recovery
%% we need to re-initialise the queue record
%% if the stream id is a match
- [Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
- case amqqueue:get_type_state(Q) of
- #{name := S} when S == StreamId ->
- rabbit_log:debug("~s: initializing queue record for stream id ~s",
- [?MODULE, StreamId]),
- _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
+ case mnesia:dirty_read(rabbit_durable_queue, QName) of
+ [] ->
+ %% queue not found at all, it must have been deleted
ok;
- _ ->
- ok
- end,
-
- send_self_command({mnesia_updated, StreamId, Args});
+ [Q] ->
+ case amqqueue:get_type_state(Q) of
+ #{name := S} when S == StreamId ->
+ rabbit_log:debug("~s: initializing queue record for stream id ~s",
+ [?MODULE, StreamId]),
+ _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
+ ok;
+ _ ->
+ ok
+ end,
+ send_self_command({mnesia_updated, StreamId, Args})
+ end;
_ ->
send_self_command({mnesia_updated, StreamId, Args})
catch _:E ->