summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-27 16:41:39 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-27 16:41:39 +0000
commit554f549a691cbd5d3057ef98914f34d396370012 (patch)
tree49f0bc0d2ae818973b71835935005ade88d972f4
parentd215ea6e9fd86d83310b130f0246c69f7c041dce (diff)
downloadrabbitmq-server-git-554f549a691cbd5d3057ef98914f34d396370012.tar.gz
Handle update_ram_duration correctly.
-rw-r--r--src/rabbit_mirror_queue_slave.erl20
-rw-r--r--src/rabbit_mirror_queue_sync.erl30
2 files changed, 29 insertions, 21 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 93ba882b58..06cda0b83d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -225,15 +225,17 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
handle_cast({sync_start, Ref, MPid},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
+ State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
+ S = fun({TRefN, BQSN}) -> State1#state{rate_timer_ref = TRefN,
+ backing_queue_state = BQSN} end,
%% [0] We can only sync when there are no pending acks
%% [1] The master died so we do not need to set_delta even though
%% we purged since we will get a depth instruction soon.
- case rabbit_mirror_queue_sync:slave(Ref, MPid, BQ, BQS,
- fun update_ram_duration/2) of
- {ok, BQS1} -> noreply(set_delta(0, S(BQS1))); %% [0]
- {failed, BQS1} -> noreply(S(BQS1)); %% [1]
- {stop, R, BQS1} -> {stop, R, S(BQS1)}
+ case rabbit_mirror_queue_sync:slave(Ref, TRef, MPid, BQ, BQS,
+ fun update_ram_duration_sync/2) of
+ {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {failed, Res} -> noreply(S(Res)); %% [1]
+ {stop, Reason, Res} -> {stop, Reason, S(Res)}
end;
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -833,6 +835,12 @@ update_ram_duration(State = #state { backing_queue = BQ,
State#state{rate_timer_ref = just_measured,
backing_queue_state = update_ram_duration(BQ, BQS)}.
+update_ram_duration_sync(BQ, BQS) ->
+ BQS1 = update_ram_duration(BQ, BQS),
+ TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRef, BQS1}.
+
update_ram_duration(BQ, BQS) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 1a7cdbb969..cdf35eb27a 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master/5, slave/5]).
+-export([master/5, slave/6]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -98,47 +98,47 @@ sync_receive_complete(SPid, MRef, Ref) ->
%% ---------------------------------------------------------------------------
-slave(Ref, MPid, BQ, BQS, UpdateRamDuration) ->
+slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, MPid),
MPid ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQS),
- slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration).
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDuration).
-slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration) ->
+slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) ->
receive
{'DOWN', MRef, process, MPid, _Reason} ->
%% If the master dies half way we are not in the usual
%% half-synced state (with messages nearer the tail of the
- %% queue; instead we have ones nearer the head. If we then
+ %% queue); instead we have ones nearer the head. If we then
%% sync with a newly promoted master, or even just receive
%% messages from it, we have a hole in the middle. So the
- %% only thing to do here is purge.)
+ %% only thing to do here is purge.
{_MsgCount, BQS1} = BQ:purge(BQS),
credit_flow:peer_down(MPid),
- {failed, BQS1};
+ {failed, {TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration);
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur);
{sync_complete, Ref} ->
MPid ! {sync_complete_ok, Ref, self()},
erlang:demonitor(MRef),
credit_flow:peer_down(MPid),
- {ok, BQS};
+ {ok, {TRef, BQS}};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration);
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration);
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur);
update_ram_duration ->
- BQS1 = UpdateRamDuration(BQ, BQS),
- slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration);
+ {TRef2, BQS1} = UpdateRamDur(BQ, BQS),
+ slave_sync_loop(Ref, TRef2, MRef, MPid, BQ, BQS1, UpdateRamDur);
{sync_message, Ref, Msg, Props0} ->
credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
Props = Props0#message_properties{needs_confirming = false,
delivered = true},
BQS1 = BQ:publish(Msg, Props, none, BQS),
- slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration);
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur);
{'EXIT', _Pid, Reason} ->
- {stop, Reason, BQS}
+ {stop, Reason, {TRef, BQS}}
end.