summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-02 12:24:48 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-02 12:24:48 +0100
commitefd50fbde7bdf15655325bd0fbdb5da2d189e020 (patch)
tree9b7ac373e5c48710f630cac029d4f08564a77e0b /src
parentc4cbc18a833b9b8c9ec09d5bcb975ee5895bb95a (diff)
downloadrabbitmq-server-git-efd50fbde7bdf15655325bd0fbdb5da2d189e020.tar.gz
refactoring of sync timer setting
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl35
1 files changed, 14 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index be4aac32ba..809f0cc68b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -159,17 +159,21 @@ noreply(NewState) ->
{noreply, NewState1, Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- next_state1(ensure_rate_timer(State), BQ:sync_callback(BQS)).
-
-next_state1(State = #q{sync_timer_ref = undefined}, Fun)
- when Fun =/= undefined ->
- {start_sync_timer(State, Fun), 0};
-next_state1(State, Fun) when Fun =/= undefined ->
- {State, 0};
-next_state1(State = #q{sync_timer_ref = undefined}, undefined) ->
+ set_sync_timer(ensure_rate_timer(State), BQ:sync_callback(BQS)).
+
+set_sync_timer(State = #q{sync_timer_ref = undefined}, undefined) ->
{State, hibernate};
-next_state1(State, undefined) ->
- {stop_sync_timer(State#q{backing_queue_timeout_fun = undefined}), hibernate}.
+set_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue,
+ maybe_run_queue_via_backing_queue, [self(), Fun]),
+ {State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}, 0};
+set_sync_timer(State = #q{sync_timer_ref = TRef}, undefined) ->
+ {ok, cancel} = timer:cancel(TRef),
+ {State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined},
+ hibernate};
+set_sync_timer(State, _Fun) ->
+ {State, 0}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue,
@@ -188,17 +192,6 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-start_sync_timer(State = #q{sync_timer_ref = undefined}, Fun)
- when Fun =/= undefined ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue,
- maybe_run_queue_via_backing_queue, [self(), Fun]),
- State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}.
-
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}.
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).