diff options
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 5ea146988a..965ea09017 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -241,15 +241,8 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State #state { backing_queue_state = BQS1 }). -handle_info(update_ram_duration, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State #state { rate_timer_ref = just_measured, - backing_queue_state = BQS2 }); +handle_info(update_ram_duration, State) -> + noreply(update_ram_duration(State)); handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( @@ -830,6 +823,15 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). +update_ram_duration(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }. + record_synchronised(#amqqueue { name = QName }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( @@ -845,7 +847,7 @@ record_synchronised(#amqqueue { name = QName }) -> end). sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> receive {'DOWN', MRef, process, MPid, _Reason} -> %% If the master dies half way we are not in the usual @@ -866,6 +868,15 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, credit_flow:peer_down(MPid), %% We can only sync when there are no pending acks set_delta(0, State); + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age), + sync_loop(Ref, MRef, MPid, State); + {'$gen_cast', {set_ram_duration_target, Duration}} -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + sync_loop(Ref, MRef, MPid, + State#state{backing_queue_state = BQS1}); + update_ram_duration -> + sync_loop(Ref, MRef, MPid, update_ram_duration(State)); {sync_message, Ref, Msg, Props0} -> credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), Props = Props0#message_properties{needs_confirming = false, |
