diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-27 16:41:39 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-27 16:41:39 +0000 |
| commit | 554f549a691cbd5d3057ef98914f34d396370012 (patch) | |
| tree | 49f0bc0d2ae818973b71835935005ade88d972f4 | |
| parent | d215ea6e9fd86d83310b130f0246c69f7c041dce (diff) | |
| download | rabbitmq-server-git-554f549a691cbd5d3057ef98914f34d396370012.tar.gz | |
Handle update_ram_duration correctly.
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 30 |
2 files changed, 29 insertions, 21 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 93ba882b58..06cda0b83d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -225,15 +225,17 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, handle_cast({sync_start, Ref, MPid}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, + State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), + S = fun({TRefN, BQSN}) -> State1#state{rate_timer_ref = TRefN, + backing_queue_state = BQSN} end, %% [0] We can only sync when there are no pending acks %% [1] The master died so we do not need to set_delta even though %% we purged since we will get a depth instruction soon. - case rabbit_mirror_queue_sync:slave(Ref, MPid, BQ, BQS, - fun update_ram_duration/2) of - {ok, BQS1} -> noreply(set_delta(0, S(BQS1))); %% [0] - {failed, BQS1} -> noreply(S(BQS1)); %% [1] - {stop, R, BQS1} -> {stop, R, S(BQS1)} + case rabbit_mirror_queue_sync:slave(Ref, TRef, MPid, BQ, BQS, + fun update_ram_duration_sync/2) of + {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {failed, Res} -> noreply(S(Res)); %% [1] + {stop, Reason, Res} -> {stop, Reason, S(Res)} end; handle_cast({set_maximum_since_use, Age}, State) -> @@ -833,6 +835,12 @@ update_ram_duration(State = #state { backing_queue = BQ, State#state{rate_timer_ref = just_measured, backing_queue_state = update_ram_duration(BQ, BQS)}. +update_ram_duration_sync(BQ, BQS) -> + BQS1 = update_ram_duration(BQ, BQS), + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + {TRef, BQS1}. + update_ram_duration(BQ, BQS) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 1a7cdbb969..cdf35eb27a 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master/5, slave/5]). +-export([master/5, slave/6]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -98,47 +98,47 @@ sync_receive_complete(SPid, MRef, Ref) -> %% --------------------------------------------------------------------------- -slave(Ref, MPid, BQ, BQS, UpdateRamDuration) -> +slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, MPid), MPid ! {sync_ready, Ref, self()}, {_MsgCount, BQS1} = BQ:purge(BQS), - slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration). + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDuration). -slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration) -> +slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) -> receive {'DOWN', MRef, process, MPid, _Reason} -> %% If the master dies half way we are not in the usual %% half-synced state (with messages nearer the tail of the - %% queue; instead we have ones nearer the head. If we then + %% queue); instead we have ones nearer the head. If we then %% sync with a newly promoted master, or even just receive %% messages from it, we have a hole in the middle. So the - %% only thing to do here is purge.) + %% only thing to do here is purge. {_MsgCount, BQS1} = BQ:purge(BQS), credit_flow:peer_down(MPid), - {failed, BQS1}; + {failed, {TRef, BQS1}}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration); + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur); {sync_complete, Ref} -> MPid ! {sync_complete_ok, Ref, self()}, erlang:demonitor(MRef), credit_flow:peer_down(MPid), - {ok, BQS}; + {ok, {TRef, BQS}}; {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), - slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration); + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur); {'$gen_cast', {set_ram_duration_target, Duration}} -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration); + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur); update_ram_duration -> - BQS1 = UpdateRamDuration(BQ, BQS), - slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration); + {TRef2, BQS1} = UpdateRamDur(BQ, BQS), + slave_sync_loop(Ref, TRef2, MRef, MPid, BQ, BQS1, UpdateRamDur); {sync_message, Ref, Msg, Props0} -> credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), Props = Props0#message_properties{needs_confirming = false, delivered = true}, BQS1 = BQ:publish(Msg, Props, none, BQS), - slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration); + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur); {'EXIT', _Pid, Reason} -> - {stop, Reason, BQS} + {stop, Reason, {TRef, BQS}} end. |
