summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2018-10-24 18:19:01 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-02-01 11:23:15 +0100
commita87f5b19164946c594fc17f1d229aeb0c2335bbb (patch)
tree6bc096152072beb453acbfde253470163377763f /src
parent93168ae5cc707efd524116d85bd9a62d867f6699 (diff)
downloadrabbitmq-server-git-a87f5b19164946c594fc17f1d229aeb0c2335bbb.tar.gz
Handle races with quorum_queue feature flag migration fun
In a few places, the migration of the `rabbit_queue` and `rabbit_durable_queue` Mnesia tables might conflict with accesses to those tables. [#159298729]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl81
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl3
-rw-r--r--src/rabbit_policy.erl22
5 files changed, 107 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ced59a8c62..b41751a267 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,7 +30,8 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
--export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
+-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
+ list_with_possible_retry/1]).
-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
@@ -297,6 +298,11 @@ start(Qs) ->
ok.
mark_local_durable_queues_stopped(VHost) ->
+ ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
+ do_mark_local_durable_queues_stopped(VHost),
+ do_mark_local_durable_queues_stopped(VHost)).
+
+do_mark_local_durable_queues_stopped(VHost) ->
Qs = find_local_durable_classic_queues(VHost),
rabbit_misc:execute_mnesia_transaction(
fun() ->
@@ -426,13 +432,21 @@ get_queue_type(Args) ->
erlang:binary_to_existing_atom(V, utf8)
end.
-internal_declare(Q, true) ->
+internal_declare(Q, Recover) ->
+ ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
+ do_internal_declare(Q, Recover),
+ begin
+ Q1 = amqqueue:upgrade(Q),
+ do_internal_declare(Q1, Recover)
+ end).
+
+do_internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
ok = store_queue(amqqueue:set_state(Q, live)),
rabbit_misc:const({created, Q})
end);
-internal_declare(Q, false) ->
+do_internal_declare(Q, false) ->
QueueName = amqqueue:get_name(Q),
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
@@ -468,6 +482,14 @@ update(Name, Fun) ->
%% only really used for quorum queues to ensure the rabbit_queue record
%% is initialised
ensure_rabbit_queue_record_is_initialized(Q) ->
+ ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
+ do_ensure_rabbit_queue_record_is_initialized(Q),
+ begin
+ Q1 = amqqueue:upgrade(Q),
+ do_ensure_rabbit_queue_record_is_initialized(Q1)
+ end).
+
+do_ensure_rabbit_queue_record_is_initialized(Q) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
ok = store_queue(Q),
@@ -794,7 +816,11 @@ check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-list() -> mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
+list() ->
+ list_with_possible_retry(fun do_list/0).
+
+do_list() ->
+ mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
@@ -828,9 +854,12 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
list(VHostPath) ->
list(VHostPath, rabbit_queue).
+list(VHostPath, TableName) ->
+ list_with_possible_retry(fun() -> do_list(VHostPath, TableName) end).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
-list(VHostPath, TableName) ->
+do_list(VHostPath, TableName) ->
mnesia:async_dirty(
fun () ->
mnesia:match_object(
@@ -839,6 +868,38 @@ list(VHostPath, TableName) ->
read)
end).
+list_with_possible_retry(Fun) ->
+ %% amqqueue migration:
+ %% The `rabbit_queue` or `rabbit_durable_queue` tables
+ %% might be migrated between the time we query the pattern
+ %% (with the `amqqueue` module) and the time we call
+ %% `mnesia:dirty_match_object()`. This would lead to an empty list
+ %% (no object matching the now incorrect pattern), not a Mnesia
+ %% error.
+ %%
+ %% So if the result is an empty list and the version of the
+ %% `amqqueue` record changed in between, we retry the operation.
+ %%
+ %% However, we don't do this if inside a Mnesia transaction: we
+ %% could end up with a live lock between this started transaction
+ %% and the Mnesia table migration which is blocked (but the
+ %% rabbit_feature_flags lock is held).
+ AmqqueueRecordVersion = amqqueue:record_version_to_use(),
+ case Fun() of
+ [] ->
+ case mnesia:is_transaction() of
+ true ->
+ [];
+ false ->
+ case amqqueue:record_version_to_use() of
+ AmqqueueRecordVersion -> [];
+ _ -> Fun()
+ end
+ end;
+ Ret ->
+ Ret
+ end.
+
list_down(VHostPath) ->
case rabbit_vhost:exists(VHostPath) of
false -> [];
@@ -859,13 +920,21 @@ count(VHost) ->
%% won't work here because with master migration of mirrored queues
%% the "ownership" of queues by nodes becomes a non-trivial problem
%% that requires a proper consensus algorithm.
- length(mnesia:dirty_index_read(rabbit_queue, VHost, amqqueue:field_vhost()))
+ length(list_for_count(VHost))
catch _:Err ->
rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p~n",
[VHost, Err]),
0
end.
+list_for_count(VHost) ->
+ list_with_possible_retry(
+ fun() ->
+ mnesia:dirty_index_read(rabbit_queue,
+ VHost,
+ amqqueue:field_vhost())
+ end).
+
info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c6684050dc..80230a3fa8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -294,6 +294,9 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
fun() ->
[Q] = mnesia:read({rabbit_queue, QName}),
Q2 = amqqueue:set_state(Q, stopped),
+ %% amqqueue migration:
+ %% The amqqueue was read from this transaction, no need
+ %% to handle migration.
rabbit_amqqueue:store_queue(Q2)
end),
BQ:terminate(R, BQS)
@@ -320,7 +323,12 @@ terminate(_Reason, State = #q{q = Q}) ->
Q2 = amqqueue:set_state(Q, crashed),
rabbit_misc:execute_mnesia_transaction(
fun() ->
- rabbit_amqqueue:store_queue(Q2)
+ ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
+ rabbit_amqqueue:store_queue(Q2),
+ begin
+ Q3 = amqqueue:upgrade(Q2),
+ rabbit_amqqueue:store_queue(Q3)
+ end)
end),
BQS
end, State).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a5736a36fc..c76bc521c2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -115,6 +115,9 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
GMPids1 = [{GM, Self} | GMPids0],
Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
Q3 = amqqueue:set_state(Q2, live),
+ %% amqqueue migration:
+ %% The amqqueue was read from this transaction, no
+ %% need to handle migration.
ok = rabbit_amqqueue:store_queue(Q3)
end,
ok = rabbit_misc:execute_mnesia_transaction(Fun),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9fc70527c0..6260c4319c 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -300,6 +300,9 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
RS1 = update_recoverable(SPids, RS0),
Q2 = amqqueue:set_recoverable_slaves(Q1, RS1),
Q3 = amqqueue:set_state(Q2, live),
+ %% amqqueue migration:
+ %% The amqqueue was read from this transaction, no need to handle
+ %% migration.
ok = rabbit_amqqueue:store_queue(Q3),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q3),
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index ed6ffc9ecd..e38e23a2f1 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -184,7 +184,11 @@ recover() ->
%% variants.
recover0() ->
Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
- Qs = mnesia:dirty_match_object(rabbit_durable_queue, amqqueue:pattern_match_all()),
+ Qs = rabbit_amqqueue:list_with_possible_retry(
+ fun() ->
+ mnesia:dirty_match_object(
+ rabbit_durable_queue, amqqueue:pattern_match_all())
+ end),
Policies = list(),
OpPolicies = list_op(),
[rabbit_misc:execute_mnesia_transaction(
@@ -203,10 +207,18 @@ recover0() ->
OpPolicy1 = match(QName, OpPolicies),
Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1),
Q3 = rabbit_queue_decorator:set(Q2),
- F = fun () ->
- mnesia:write(rabbit_durable_queue, Q3, write)
- end,
- rabbit_misc:execute_mnesia_transaction(F)
+ ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_queue, Q3, write)
+ end),
+ begin
+ Q4 = amqqueue:upgrade(Q3),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_queue, Q4, write)
+ end)
+ end)
end || Q0 <- Qs],
ok.