diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2018-10-24 18:19:01 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-02-01 11:23:15 +0100 |
| commit | a87f5b19164946c594fc17f1d229aeb0c2335bbb (patch) | |
| tree | 6bc096152072beb453acbfde253470163377763f /src | |
| parent | 93168ae5cc707efd524116d85bd9a62d867f6699 (diff) | |
| download | rabbitmq-server-git-a87f5b19164946c594fc17f1d229aeb0c2335bbb.tar.gz | |
Handle races with quorum_queue feature flag migration fun
In a few places, the migration of the `rabbit_queue` and
`rabbit_durable_queue` Mnesia tables might conflict with accesses to
those tables.
[#159298729]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 81 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 22 |
5 files changed, 107 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ced59a8c62..b41751a267 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,7 +30,8 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]). +-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, + list_with_possible_retry/1]). -export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). @@ -297,6 +298,11 @@ start(Qs) -> ok. mark_local_durable_queues_stopped(VHost) -> + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + do_mark_local_durable_queues_stopped(VHost), + do_mark_local_durable_queues_stopped(VHost)). + +do_mark_local_durable_queues_stopped(VHost) -> Qs = find_local_durable_classic_queues(VHost), rabbit_misc:execute_mnesia_transaction( fun() -> @@ -426,13 +432,21 @@ get_queue_type(Args) -> erlang:binary_to_existing_atom(V, utf8) end. -internal_declare(Q, true) -> +internal_declare(Q, Recover) -> + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + do_internal_declare(Q, Recover), + begin + Q1 = amqqueue:upgrade(Q), + do_internal_declare(Q1, Recover) + end). + +do_internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> ok = store_queue(amqqueue:set_state(Q, live)), rabbit_misc:const({created, Q}) end); -internal_declare(Q, false) -> +do_internal_declare(Q, false) -> QueueName = amqqueue:get_name(Q), rabbit_misc:execute_mnesia_tx_with_tail( fun () -> @@ -468,6 +482,14 @@ update(Name, Fun) -> %% only really used for quorum queues to ensure the rabbit_queue record %% is initialised ensure_rabbit_queue_record_is_initialized(Q) -> + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + do_ensure_rabbit_queue_record_is_initialized(Q), + begin + Q1 = amqqueue:upgrade(Q), + do_ensure_rabbit_queue_record_is_initialized(Q1) + end). + +do_ensure_rabbit_queue_record_is_initialized(Q) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> ok = store_queue(Q), @@ -794,7 +816,11 @@ check_queue_type({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -list() -> mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()). +list() -> + list_with_possible_retry(fun do_list/0). + +do_list() -> + mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()). list_names() -> mnesia:dirty_all_keys(rabbit_queue). @@ -828,9 +854,12 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> list(VHostPath) -> list(VHostPath, rabbit_queue). +list(VHostPath, TableName) -> + list_with_possible_retry(fun() -> do_list(VHostPath, TableName) end). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context -list(VHostPath, TableName) -> +do_list(VHostPath, TableName) -> mnesia:async_dirty( fun () -> mnesia:match_object( @@ -839,6 +868,38 @@ list(VHostPath, TableName) -> read) end). +list_with_possible_retry(Fun) -> + %% amqqueue migration: + %% The `rabbit_queue` or `rabbit_durable_queue` tables + %% might be migrated between the time we query the pattern + %% (with the `amqqueue` module) and the time we call + %% `mnesia:dirty_match_object()`. This would lead to an empty list + %% (no object matching the now incorrect pattern), not a Mnesia + %% error. + %% + %% So if the result is an empty list and the version of the + %% `amqqueue` record changed in between, we retry the operation. + %% + %% However, we don't do this if inside a Mnesia transaction: we + %% could end up with a live lock between this started transaction + %% and the Mnesia table migration which is blocked (but the + %% rabbit_feature_flags lock is held). + AmqqueueRecordVersion = amqqueue:record_version_to_use(), + case Fun() of + [] -> + case mnesia:is_transaction() of + true -> + []; + false -> + case amqqueue:record_version_to_use() of + AmqqueueRecordVersion -> []; + _ -> Fun() + end + end; + Ret -> + Ret + end. + list_down(VHostPath) -> case rabbit_vhost:exists(VHostPath) of false -> []; @@ -859,13 +920,21 @@ count(VHost) -> %% won't work here because with master migration of mirrored queues %% the "ownership" of queues by nodes becomes a non-trivial problem %% that requires a proper consensus algorithm. - length(mnesia:dirty_index_read(rabbit_queue, VHost, amqqueue:field_vhost())) + length(list_for_count(VHost)) catch _:Err -> rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p~n", [VHost, Err]), 0 end. +list_for_count(VHost) -> + list_with_possible_retry( + fun() -> + mnesia:dirty_index_read(rabbit_queue, + VHost, + amqqueue:field_vhost()) + end). + info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6684050dc..80230a3fa8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -294,6 +294,9 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> fun() -> [Q] = mnesia:read({rabbit_queue, QName}), Q2 = amqqueue:set_state(Q, stopped), + %% amqqueue migration: + %% The amqqueue was read from this transaction, no need + %% to handle migration. rabbit_amqqueue:store_queue(Q2) end), BQ:terminate(R, BQS) @@ -320,7 +323,12 @@ terminate(_Reason, State = #q{q = Q}) -> Q2 = amqqueue:set_state(Q, crashed), rabbit_misc:execute_mnesia_transaction( fun() -> - rabbit_amqqueue:store_queue(Q2) + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + rabbit_amqqueue:store_queue(Q2), + begin + Q3 = amqqueue:upgrade(Q2), + rabbit_amqqueue:store_queue(Q3) + end) end), BQS end, State). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a5736a36fc..c76bc521c2 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -115,6 +115,9 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) -> GMPids1 = [{GM, Self} | GMPids0], Q2 = amqqueue:set_gm_pids(Q1, GMPids1), Q3 = amqqueue:set_state(Q2, live), + %% amqqueue migration: + %% The amqqueue was read from this transaction, no + %% need to handle migration. ok = rabbit_amqqueue:store_queue(Q3) end, ok = rabbit_misc:execute_mnesia_transaction(Fun), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9fc70527c0..6260c4319c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -300,6 +300,9 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) -> RS1 = update_recoverable(SPids, RS0), Q2 = amqqueue:set_recoverable_slaves(Q1, RS1), Q3 = amqqueue:set_state(Q2, live), + %% amqqueue migration: + %% The amqqueue was read from this transaction, no need to handle + %% migration. ok = rabbit_amqqueue:store_queue(Q3), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q3), diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index ed6ffc9ecd..e38e23a2f1 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -184,7 +184,11 @@ recover() -> %% variants. recover0() -> Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), - Qs = mnesia:dirty_match_object(rabbit_durable_queue, amqqueue:pattern_match_all()), + Qs = rabbit_amqqueue:list_with_possible_retry( + fun() -> + mnesia:dirty_match_object( + rabbit_durable_queue, amqqueue:pattern_match_all()) + end), Policies = list(), OpPolicies = list_op(), [rabbit_misc:execute_mnesia_transaction( @@ -203,10 +207,18 @@ recover0() -> OpPolicy1 = match(QName, OpPolicies), Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1), Q3 = rabbit_queue_decorator:set(Q2), - F = fun () -> - mnesia:write(rabbit_durable_queue, Q3, write) - end, - rabbit_misc:execute_mnesia_transaction(F) + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_queue, Q3, write) + end), + begin + Q4 = amqqueue:upgrade(Q3), + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_queue, Q4, write) + end) + end) end || Q0 <- Qs], ok. |
