diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-02 12:24:48 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-02 12:24:48 +0100 |
| commit | efd50fbde7bdf15655325bd0fbdb5da2d189e020 (patch) | |
| tree | 9b7ac373e5c48710f630cac029d4f08564a77e0b /src | |
| parent | c4cbc18a833b9b8c9ec09d5bcb975ee5895bb95a (diff) | |
| download | rabbitmq-server-git-efd50fbde7bdf15655325bd0fbdb5da2d189e020.tar.gz | |
refactoring of sync timer setting
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 35 |
1 files changed, 14 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index be4aac32ba..809f0cc68b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -159,17 +159,21 @@ noreply(NewState) -> {noreply, NewState1, Timeout}. next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - next_state1(ensure_rate_timer(State), BQ:sync_callback(BQS)). - -next_state1(State = #q{sync_timer_ref = undefined}, Fun) - when Fun =/= undefined -> - {start_sync_timer(State, Fun), 0}; -next_state1(State, Fun) when Fun =/= undefined -> - {State, 0}; -next_state1(State = #q{sync_timer_ref = undefined}, undefined) -> + set_sync_timer(ensure_rate_timer(State), BQ:sync_callback(BQS)). + +set_sync_timer(State = #q{sync_timer_ref = undefined}, undefined) -> {State, hibernate}; -next_state1(State, undefined) -> - {stop_sync_timer(State#q{backing_queue_timeout_fun = undefined}), hibernate}. +set_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, + maybe_run_queue_via_backing_queue, [self(), Fun]), + {State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}, 0}; +set_sync_timer(State = #q{sync_timer_ref = TRef}, undefined) -> + {ok, cancel} = timer:cancel(TRef), + {State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}, + hibernate}; +set_sync_timer(State, _Fun) -> + {State, 0}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, @@ -188,17 +192,6 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. -start_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) - when Fun =/= undefined -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, - maybe_run_queue_via_backing_queue, [self(), Fun]), - State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}. - -stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}. - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). |
