diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-12-14 11:36:02 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-12-14 11:36:02 +0000 |
| commit | 2c35646ffc45c7ca262fe1c00600e0866b991c80 (patch) | |
| tree | bb63e9b78a47c25697a2d7e0e7ce88c72cccfed0 /src | |
| parent | 54082cc9333d45e2063cfe1c13a3e52a23d30e89 (diff) | |
| download | rabbitmq-server-git-2c35646ffc45c7ca262fe1c00600e0866b991c80.tar.gz | |
Ensure rabbitmqctl list_queues remains responsive during sync, and emit stats events too.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 18 |
3 files changed, 40 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 730c235e53..5d9da20491 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + status }). -record(consumer, {tag, ack_required}). @@ -97,7 +98,8 @@ memory, slave_pids, synchronised_slave_pids, - backing_queue_status + backing_queue_status, + status ]). -define(CREATION_EVENT_KEYS, @@ -138,7 +140,8 @@ init(Q) -> unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = gb_trees:empty()}, + msg_id_to_channel = gb_trees:empty(), + status = running}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -164,7 +167,8 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = MTC}, + msg_id_to_channel = MTC, + status = running}, State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) @@ -934,6 +938,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(status, #q{status = Status}) -> + Status; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1157,8 +1163,22 @@ handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, + InfoPull = fun (Status) -> + receive {'$gen_call', From, {info, Items}} -> + Infos = infos(Items, State#q{status = Status}), + gen_server2:reply(From, {ok, Infos}) + after 0 -> + ok + end + end, + InfoPush = fun (Status) -> + rabbit_event:if_enabled( + State, #q.stats_timer, + fun() -> emit_stats(State#q{status = Status}) end) + end, case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors( + InfoPull, InfoPush, BQS) of {shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)}; {Result, BQS1} -> reply(Result, S(BQS1)) end; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c9b6269b58..601649ef0a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -28,7 +28,7 @@ -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). -behaviour(rabbit_backing_queue). @@ -127,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors(State = #state { name = QName, +sync_mirrors(InfoPull, InfoPush, + State = #state { name = QName, gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -141,7 +142,8 @@ sync_mirrors(State = #state { name = QName, Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, - case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of + case rabbit_mirror_queue_sync:master_go( + Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; {sync_died, R, BQS1} -> Log("~p", [R]), {ok, S(BQS1)}; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index bb12cf491a..c10d8fd328 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/5, slave/7]). +-export([master_prepare/3, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -59,16 +59,17 @@ master_prepare(Ref, Log, SPids) -> MPid = self(), spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, BQ, BQS) -> - Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, +master_go(Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) -> + Args = {Syncer, Ref, Log, InfoPush, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; - {ready, Syncer} -> master_go0(Args, BQ, BQS) + {ready, Syncer} -> master_go0(InfoPull, Args, BQ, BQS) end. -master_go0(Args, BQ, BQS) -> +master_go0(InfoPull, Args, BQ, BQS) -> case BQ:fold(fun (Msg, MsgProps, {I, Last}) -> + InfoPull({synchronising, I}), master_send(Args, I, Last, Msg, MsgProps) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; @@ -76,10 +77,11 @@ master_go0(Args, BQ, BQS) -> {_, BQS1} -> master_done(Args, BQS1) end. -master_send({Syncer, Ref, Log, Parent}, I, Last, Msg, MsgProps) -> +master_send({Syncer, Ref, Log, InfoPush, Parent}, I, Last, Msg, MsgProps) -> Acc = {I + 1, case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of - true -> Log("~p messages", [I]), + true -> InfoPush({synchronising, I}), + Log("~p messages", [I]), erlang:now(); false -> Last end}, @@ -96,7 +98,7 @@ master_send({Syncer, Ref, Log, Parent}, I, Last, Msg, MsgProps) -> {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. -master_done({Syncer, Ref, _Log, Parent}, BQS) -> +master_done({Syncer, Ref, _Log, _InfoPush, Parent}, BQS) -> receive {next, Ref} -> unlink(Syncer), Syncer ! {done, Ref}, |
