diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 17 |
2 files changed, 61 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3ee14e4f7d..bf7484557d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,6 +54,7 @@ max_length, max_bytes, args_policy_version, + mirroring_policy_version = 0, status }). @@ -1207,22 +1208,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); +handle_cast(update_mirroring, State = #q{q = Q, + mirroring_policy_version = Version}) -> + case needs_update_mirroring(Q, Version) of + false -> + noreply(State); + {Policy, NewVersion} -> + State1 = State#q{mirroring_policy_version = NewVersion}, + noreply(update_mirroring(Policy, State1)) + end; handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{consumers = Consumers, @@ -1381,9 +1375,52 @@ log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) -> Reason, [QName, VHost]). +needs_update_mirroring(_Q, _Version) -> + %% hook here GaS changes on the policy + true. + +update_mirroring(Policy, State = #q{backing_queue = BQ}) -> + case update_to(Policy, BQ) of + start_mirroring -> + start_mirroring(State); + stop_mirroring -> + stop_mirroring(State); + ignore -> + State; + update_ha_mode -> + update_ha_mode(State) + end. +update_to(undefined, rabbit_mirror_queue_master) -> + stop_mirroring; +update_to(_, rabbit_mirror_queue_master) -> + update_ha_mode; +update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master -> + ignore; +update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master -> + start_mirroring. + +start_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. +stop_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. +update_ha_mode(State) -> + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + ok = rabbit_mirror_queue_misc:update_mirrors(Q), + State. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 849efa3611..b188298a9b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1, + is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -64,6 +64,8 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(update_mirrors/1 :: + (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_drop_master_after_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). @@ -384,15 +386,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; - {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors0(OldQ = #amqqueue{name = QName}, - NewQ = #amqqueue{name = QName}) -> - {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), +update_mirrors(Q = #amqqueue{name = QName}) -> + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], %% When a mirror dies, remove_from_queue/2 might have to add new @@ -406,7 +405,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. - maybe_auto_sync(NewQ), + maybe_auto_sync(Q), ok. %% The arrival of a newly synced slave may cause the master to die if |
