diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-18 17:55:36 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-18 17:55:36 +0000 |
| commit | ebcbafd902b299e9496c85452836b00add8f59de (patch) | |
| tree | a0b52eac7b00d23de3e4950853a97c1c3b02115d /src | |
| parent | fdeb4e55099bc30286f7a90ac154f87c1c17b01a (diff) | |
| download | rabbitmq-server-git-ebcbafd902b299e9496c85452836b00add8f59de.tar.gz | |
Specialise the sync_timer in order to permit us to unset the sync_timer_ref
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 |
2 files changed, 13 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index db07f1367a..41a0ca12bd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, maybe_run_queue_via_backing_queue_async/2, - update_ram_duration/1, set_ram_duration_target/2, + sync_timer/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, @@ -157,6 +157,7 @@ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). +-spec(sync_timer/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -465,6 +466,9 @@ maybe_run_queue_via_backing_queue(QPid, Fun) -> maybe_run_queue_via_backing_queue_async(QPid, Fun) -> gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). +sync_timer(QPid) -> + gen_server2:cast(QPid, sync_timer). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 38b83117f3..c028509a51 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -224,11 +224,9 @@ next_state(State) -> false -> {stop_sync_timer(State2), hibernate} end. -ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> +ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, - rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]), + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timer, [self()]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -797,6 +795,7 @@ prioritise_cast(Msg, _State) -> {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {maybe_run_queue_via_backing_queue, _Fun} -> 6; + sync_timer -> 6; _ -> 0 end. @@ -1017,6 +1016,11 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); +handle_cast(sync_timer, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS), + sync_timer_ref = undefined}); + handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), |
