summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-12-14 11:36:02 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-12-14 11:36:02 +0000
commit2c35646ffc45c7ca262fe1c00600e0866b991c80 (patch)
treebb63e9b78a47c25697a2d7e0e7ce88c72cccfed0 /src
parent54082cc9333d45e2063cfe1c13a3e52a23d30e89 (diff)
downloadrabbitmq-server-git-2c35646ffc45c7ca262fe1c00600e0866b991c80.tar.gz
Ensure rabbitmqctl list_queues remains responsive during sync, and emit stats events too.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl30
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_sync.erl18
3 files changed, 40 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 730c235e53..5d9da20491 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ status
}).
-record(consumer, {tag, ack_required}).
@@ -97,7 +98,8 @@
memory,
slave_pids,
synchronised_slave_pids,
- backing_queue_status
+ backing_queue_status,
+ status
]).
-define(CREATION_EVENT_KEYS,
@@ -138,7 +140,8 @@ init(Q) ->
unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
+ msg_id_to_channel = gb_trees:empty(),
+ status = running},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -164,7 +167,8 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = MTC},
+ msg_id_to_channel = MTC,
+ status = running},
State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
@@ -934,6 +938,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(status, #q{status = Status}) ->
+ Status;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1157,8 +1163,22 @@ handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ InfoPull = fun (Status) ->
+ receive {'$gen_call', From, {info, Items}} ->
+ Infos = infos(Items, State#q{status = Status}),
+ gen_server2:reply(From, {ok, Infos})
+ after 0 ->
+ ok
+ end
+ end,
+ InfoPush = fun (Status) ->
+ rabbit_event:if_enabled(
+ State, #q.stats_timer,
+ fun() -> emit_stats(State#q{status = Status}) end)
+ end,
case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(
+ InfoPull, InfoPush, BQS) of
{shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)};
{Result, BQS1} -> reply(Result, S(BQS1))
end;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c9b6269b58..601649ef0a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -28,7 +28,7 @@
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
@@ -127,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors(State = #state { name = QName,
+sync_mirrors(InfoPull, InfoPush,
+ State = #state { name = QName,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -141,7 +142,8 @@ sync_mirrors(State = #state { name = QName,
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
- case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
+ case rabbit_mirror_queue_sync:master_go(
+ Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index bb12cf491a..c10d8fd328 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/5, slave/7]).
+-export([master_prepare/3, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -59,16 +59,17 @@ master_prepare(Ref, Log, SPids) ->
MPid = self(),
spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
-master_go(Syncer, Ref, Log, BQ, BQS) ->
- Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+master_go(Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, InfoPush, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
- {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ {ready, Syncer} -> master_go0(InfoPull, Args, BQ, BQS)
end.
-master_go0(Args, BQ, BQS) ->
+master_go0(InfoPull, Args, BQ, BQS) ->
case BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
+ InfoPull({synchronising, I}),
master_send(Args, I, Last, Msg, MsgProps)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
@@ -76,10 +77,11 @@ master_go0(Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send({Syncer, Ref, Log, Parent}, I, Last, Msg, MsgProps) ->
+master_send({Syncer, Ref, Log, InfoPush, Parent}, I, Last, Msg, MsgProps) ->
Acc = {I + 1,
case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
- true -> Log("~p messages", [I]),
+ true -> InfoPush({synchronising, I}),
+ Log("~p messages", [I]),
erlang:now();
false -> Last
end},
@@ -96,7 +98,7 @@ master_send({Syncer, Ref, Log, Parent}, I, Last, Msg, MsgProps) ->
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
-master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+master_done({Syncer, Ref, _Log, _InfoPush, Parent}, BQS) ->
receive
{next, Ref} -> unlink(Syncer),
Syncer ! {done, Ref},