summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-18 17:55:36 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-18 17:55:36 +0000
commitebcbafd902b299e9496c85452836b00add8f59de (patch)
treea0b52eac7b00d23de3e4950853a97c1c3b02115d /src
parentfdeb4e55099bc30286f7a90ac154f87c1c17b01a (diff)
downloadrabbitmq-server-git-ebcbafd902b299e9496c85452836b00add8f59de.tar.gz
Specialise the sync_timer in order to permit us to unset the sync_timer_ref
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl12
2 files changed, 13 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index db07f1367a..41a0ca12bd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
maybe_run_queue_via_backing_queue_async/2,
- update_ram_duration/1, set_ram_duration_target/2,
+ sync_timer/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
@@ -157,6 +157,7 @@
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(sync_timer/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -465,6 +466,9 @@ maybe_run_queue_via_backing_queue(QPid, Fun) ->
maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+sync_timer(QPid) ->
+ gen_server2:cast(QPid, sync_timer).
+
update_ram_duration(QPid) ->
gen_server2:cast(QPid, update_ram_duration).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 38b83117f3..c028509a51 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -224,11 +224,9 @@ next_state(State) ->
false -> {stop_sync_timer(State2), hibernate}
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
+ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL,
- rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timer, [self()]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -797,6 +795,7 @@ prioritise_cast(Msg, _State) ->
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ sync_timer -> 6;
_ -> 0
end.
@@ -1017,6 +1016,11 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast(sync_timer, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS),
+ sync_timer_ref = undefined});
+
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Delivery, State),