diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 14:30:55 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 14:30:55 +0000 |
| commit | 4f849e262b7ff7327e913b27d00ce83a6b4e0e43 (patch) | |
| tree | 6f591b27b81832bd68fa603ad23010a57dba6cc8 | |
| parent | 7686980bd151e5d3c46220bfcfcdc1b01d25b821 (diff) | |
| parent | 596aae238c097155b24fbc81810be33d96c63388 (diff) | |
| download | rabbitmq-server-git-4f849e262b7ff7327e913b27d00ce83a6b4e0e43.tar.gz | |
Merge in default
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 85 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 44 |
4 files changed, 151 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7827b839d5..c1884118a9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -173,6 +173,9 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). +-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) -> + 'ok' | rabbit_types:error('queue_has_pending_acks') + | rabbit_types:error('queue_not_mirrored')). -endif. @@ -590,6 +593,8 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). +sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe14baa728..60857e7e3d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1154,6 +1154,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); +handle_call(sync_mirrors, From, + State = #q{q = #amqqueue{name = Name}, + backing_queue = rabbit_mirror_queue_master = BQ, + backing_queue_state = BQS}) -> + case BQ:depth(BQS) - BQ:len(BQS) of + 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + gen_server2:reply(From, ok), + noreply(State#q{backing_queue_state = + rabbit_mirror_queue_master:sync_mirrors( + SPids -- SSPids, Name, BQS)}); + _ -> reply({error, queue_has_pending_acks}, State) + end; + +handle_call(sync_mirrors, _From, State) -> + reply({error, queue_not_mirrored}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8fcd1893a6..64b78fbbdb 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -28,7 +28,7 @@ -export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). -behaviour(rabbit_backing_queue). @@ -45,6 +45,8 @@ known_senders }). +-define(SYNC_PROGRESS_INTERVAL, 1000000). + -ifdef(use_specs). -export_type([death_fun/0, depth_fun/0]). @@ -127,6 +129,87 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. +sync_mirrors([], Name, State) -> + rabbit_log:info("Synchronising ~s: nothing to do~n", + [rabbit_misc:rs(Name)]), + State; +sync_mirrors(SPids, Name, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n", + [rabbit_misc:rs(Name), SPids, BQ:len(BQS)]), + Ref = make_ref(), + %% We send the start over GM to flush out any other messages that + %% we might have sent that way already. + gm:broadcast(GM, {sync_start, Ref, self(), SPids}), + SPidsMRefs = [begin + MRef = erlang:monitor(process, SPid), + {SPid, MRef} + end || SPid <- SPids], + %% We wait for a reply from the slaves so that we know they are in + %% a receive block and will thus receive messages we send to them + %% *without* those messages ending up in their gen_server2 pqueue. + SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), + {{_, SPidsMRefs2, _}, BQS1} = + BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> + SPMR1 = wait_for_credit(SPMR, Ref), + [begin + credit_flow:send(SPid, ?CREDIT_DISC_BOUND), + SPid ! {sync_message, Ref, Msg, MsgProps} + end || {SPid, _} <- SPMR1], + {I + 1, SPMR1, + case timer:now_diff(erlang:now(), Last) > + ?SYNC_PROGRESS_INTERVAL of + true -> rabbit_log:info( + "Synchronising ~s: ~p messages~n", + [rabbit_misc:rs(Name), I]), + erlang:now(); + false -> Last + end} + end, {0, SPidsMRefs1, erlang:now()}, BQS), + sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3), + rabbit_log:info("Synchronising ~s: complete~n", + [rabbit_misc:rs(Name)]), + State#state{backing_queue_state = BQS1}. + +wait_for_credit(SPidsMRefs, Ref) -> + case credit_flow:blocked() of + true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref, + fun sync_receive_credit/3), Ref); + false -> SPidsMRefs + end. + +sync_foreach(SPidsMRefs, Ref, Fun) -> + [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, + SPid1 <- [Fun(SPid, MRef, Ref)], + SPid1 =/= dead]. + +sync_receive_ready(SPid, MRef, Ref) -> + receive + {sync_ready, Ref, SPid} -> SPid; + {'DOWN', MRef, _, SPid, _} -> dead + end. + +sync_receive_credit(SPid, MRef, Ref) -> + receive + {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), + sync_receive_credit(SPid, MRef, Ref); + {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), + dead + after 0 -> + SPid + end. + +sync_receive_complete(SPid, MRef, Ref) -> + SPid ! {sync_complete, Ref}, + receive + {sync_complete_ok, Ref, SPid} -> ok; + {'DOWN', MRef, _, SPid, _} -> ok + end, + erlang:demonitor(MRef, [flush]), + credit_flow:peer_down(SPid). + + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index cb7a2135c3..5ea146988a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -222,6 +222,15 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); +handle_cast({sync_start, Ref, MPid}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + MRef = erlang:monitor(process, MPid), + MPid ! {sync_ready, Ref, self()}, + {_MsgCount, BQS1} = BQ:purge(BQS), + noreply( + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})); + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); @@ -358,6 +367,11 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; +handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) -> + case lists:member(SPid, SPids) of + true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid}); + false -> ok + end; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). @@ -829,3 +843,33 @@ record_synchronised(#amqqueue { name = QName }) -> ok end end). + +sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + receive + {'DOWN', MRef, process, MPid, _Reason} -> + %% If the master dies half way we are not in the usual + %% half-synced state (with messages nearer the tail of the + %% queue; instead we have ones nearer the head. If we then + %% sync with a newly promoted master, or even just receive + %% messages from it, we have a hole in the middle. So the + %% only thing to do here is purge.) + {_MsgCount, BQS1} = BQ:purge(BQS), + credit_flow:peer_down(MPid), + State#state{backing_queue_state = BQS1}; + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + sync_loop(Ref, MRef, MPid, State); + {sync_complete, Ref} -> + MPid ! {sync_complete_ok, Ref, self()}, + erlang:demonitor(MRef), + credit_flow:peer_down(MPid), + %% We can only sync when there are no pending acks + set_delta(0, State); + {sync_message, Ref, Msg, Props0} -> + credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), + Props = Props0#message_properties{needs_confirming = false, + delivered = true}, + BQS1 = BQ:publish(Msg, Props, none, BQS), + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}) + end. |
