diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-02 15:05:50 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-02 15:05:50 +0100 |
| commit | 74979e5c28b2577b6f1cae2d9c49e70d1bba5bb9 (patch) | |
| tree | 0c3ab5f5c3ecad68534d67837bba811e67359459 | |
| parent | 835f6480f5f0c00c60c056807e4ea0c29eda8159 (diff) | |
| download | rabbitmq-server-git-74979e5c28b2577b6f1cae2d9c49e70d1bba5bb9.tar.gz | |
restructure backing queue sync'ing
The code very much relied on the fact that sync_callback would always
return the same fun when a sync was required. So it makes sense to
capture that in the API by splitting sync_callback into a 'needs_sync'
predicate and a separate callback handler.
As a result we do not need the backing_queue_timeout_fun state member.
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
5 files changed, 31 insertions, 29 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 0a0931ea34..1b536dfad1 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -57,7 +57,7 @@ -spec(set_ram_duration_target/2 :: (('undefined' | 'infinity' | number()), state()) -> state()). -spec(ram_duration/1 :: (state()) -> {number(), state()}). --spec(sync_callback/1 :: (state()) -> - ('undefined' | (fun ((A) -> {boolean(), A})))). +-spec(needs_sync/1 :: (state()) -> boolean()). +-spec(sync/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 809f0cc68b..ab88a3c294 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,7 +55,6 @@ has_had_consumers, backing_queue, backing_queue_state, - backing_queue_timeout_fun, active_consumers, blocked_consumers, sync_timer_ref, @@ -110,7 +109,6 @@ init(Q) -> has_had_consumers = false, backing_queue = BQ, backing_queue_state = undefined, - backing_queue_timeout_fun = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), sync_timer_ref = undefined, @@ -159,19 +157,20 @@ noreply(NewState) -> {noreply, NewState1, Timeout}. next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - set_sync_timer(ensure_rate_timer(State), BQ:sync_callback(BQS)). + set_sync_timer(ensure_rate_timer(State), BQ:needs_sync(BQS)). -set_sync_timer(State = #q{sync_timer_ref = undefined}, undefined) -> +set_sync_timer(State = #q{sync_timer_ref = undefined}, false) -> {State, hibernate}; -set_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) -> +set_sync_timer(State = #q{sync_timer_ref = undefined, + backing_queue = BQ}, true) -> {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, - maybe_run_queue_via_backing_queue, [self(), Fun]), - {State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}, 0}; -set_sync_timer(State = #q{sync_timer_ref = TRef}, undefined) -> + ?SYNC_INTERVAL, + rabbit_amqqueue, maybe_run_queue_via_backing_queue, + [self(), fun (BQS) -> BQ:sync(BQS) end]), + {State#q{sync_timer_ref = TRef}, 0}; +set_sync_timer(State = #q{sync_timer_ref = TRef}, false) -> {ok, cancel} = timer:cancel(TRef), - {State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}, - hibernate}; + {State#q{sync_timer_ref = undefined}, hibernate}; set_sync_timer(State, _Fun) -> {State, 0}. @@ -823,12 +822,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) -> - noreply(State); - -handle_info(timeout, State = #q{backing_queue_timeout_fun = Fun}) -> +handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - Fun, State#q{backing_queue_timeout_fun = undefined})); + fun (BQS) -> BQ:sync(BQS) end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 583cd4da27..d9c898201c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -113,11 +113,12 @@ behaviour_info(callbacks) -> %% queue. {ram_duration, 1}, - %% Can return 'undefined' or a thunk which will receive the - %% state, and must return the state, which will be invoked as - %% soon as the queue process can manage (either on an empty - %% mailbox, or when a timer fires). - {sync_callback, 1}, + %% Should 'sync' be called as soon as the queue process can + %% manage (either on an empty mailbox, or when a timer fires)? + {needs_sync, 1}, + + %% Called (eventually) after needs_sync returns 'true'. + {sync, 1}, %% Called immediately before the queue hibernates. {handle_pre_hibernate, 1}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index bee97651d6..722ea321b5 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, sync_callback/1, + set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -191,8 +191,11 @@ set_ram_duration_target(_DurationTarget, State) -> ram_duration(State) -> {0, State}. -sync_callback(_State) -> - undefined. +needs_sync(_State) -> + false. + +sync(State) -> + State. handle_pre_hibernate(State) -> State. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 35d2b19145..b5cf98450d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, publish/2, publish_delivered/3, set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3, - tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1, + tx_ack/3, tx_rollback/2, tx_commit/3, needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -685,8 +685,10 @@ ram_duration(State = #vqstate { egress_rate = Egress, ram_msg_count_prev = RamMsgCount, out_counter = 0, in_counter = 0 })}. -sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined; -sync_callback(_) -> fun tx_commit_index/1. +needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; +needs_sync(_) -> true. + +sync(State) -> tx_commit_index(State). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = |
