summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_slave.erl31
1 files changed, 21 insertions, 10 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 5ea146988a..965ea09017 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -241,15 +241,8 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State #state { rate_timer_ref = just_measured,
- backing_queue_state = BQS2 });
+handle_info(update_ram_duration, State) ->
+ noreply(update_ram_duration(State));
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
@@ -830,6 +823,15 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+update_ram_duration(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ State #state { rate_timer_ref = just_measured,
+ backing_queue_state = BQS2 }.
+
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
@@ -845,7 +847,7 @@ record_synchronised(#amqqueue { name = QName }) ->
end).
sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
receive
{'DOWN', MRef, process, MPid, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -866,6 +868,15 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
credit_flow:peer_down(MPid),
%% We can only sync when there are no pending acks
set_delta(0, State);
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ sync_loop(Ref, MRef, MPid, State);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ sync_loop(Ref, MRef, MPid,
+ State#state{backing_queue_state = BQS1});
+ update_ram_duration ->
+ sync_loop(Ref, MRef, MPid, update_ram_duration(State));
{sync_message, Ref, Msg, Props0} ->
credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
Props = Props0#message_properties{needs_confirming = false,