diff options
| author | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-05-25 07:46:34 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2016-08-31 09:49:58 +0100 |
| commit | 45caaaa959f93ce0c061e59fda1e94df9466f8a4 (patch) | |
| tree | f96d263314c746299b76a3406f741265281da817 /src | |
| parent | 3b0f2762cb1c526b1b4d293abfa35adabca62305 (diff) | |
| download | rabbitmq-server-git-45caaaa959f93ce0c061e59fda1e94df9466f8a4.tar.gz | |
Take update mirroring decision in rabbit_amqqueue_process
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 15 |
2 files changed, 63 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 66df42987c..e346df38dc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,6 +85,7 @@ %% e.g. message expiration messages from previously set up timers %% that may or may not be still valid args_policy_version, + mirroring_policy_version = 0, %% running | flow | idle status }). @@ -1225,22 +1226,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); +handle_cast(update_mirroring, State = #q{q = Q, + mirroring_policy_version = Version}) -> + case needs_update_mirroring(Q, Version) of + false -> + noreply(State); + {Policy, NewVersion} -> + State1 = State#q{mirroring_policy_version = NewVersion}, + noreply(update_mirroring(Policy, State1)) + end; handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{consumers = Consumers, @@ -1383,3 +1377,50 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + +needs_update_mirroring(_Q, _Version) -> + %% hook here GaS changes on the policy + true. + +update_mirroring(Policy, State = #q{backing_queue = BQ}) -> + case update_to(Policy, BQ) of + start_mirroring -> + start_mirroring(State); + stop_mirroring -> + stop_mirroring(State); + ignore -> + State; + update_ha_mode -> + update_ha_mode(State) + end. + +update_to(undefined, rabbit_mirror_queue_master) -> + stop_mirroring; +update_to(_, rabbit_mirror_queue_master) -> + update_ha_mode; +update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master -> + ignore; +update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master -> + start_mirroring. + +start_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +stop_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +update_ha_mode(State) -> + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + ok = rabbit_mirror_queue_misc:update_mirrors(Q), + State. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4205fabb83..375a0366dd 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1, + is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -402,15 +402,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; - {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors0(OldQ = #amqqueue{name = QName}, - NewQ = #amqqueue{name = QName}) -> - {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), +update_mirrors(Q = #amqqueue{name = QName}) -> + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], %% When a mirror dies, remove_from_queue/2 might have to add new @@ -424,7 +421,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. - maybe_auto_sync(NewQ), + maybe_auto_sync(Q), ok. %% The arrival of a newly synced slave may cause the master to die if |
