diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
4 files changed, 29 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 809f0cc68b..ab88a3c294 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,7 +55,6 @@ has_had_consumers, backing_queue, backing_queue_state, - backing_queue_timeout_fun, active_consumers, blocked_consumers, sync_timer_ref, @@ -110,7 +109,6 @@ init(Q) -> has_had_consumers = false, backing_queue = BQ, backing_queue_state = undefined, - backing_queue_timeout_fun = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), sync_timer_ref = undefined, @@ -159,19 +157,20 @@ noreply(NewState) -> {noreply, NewState1, Timeout}. next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - set_sync_timer(ensure_rate_timer(State), BQ:sync_callback(BQS)). + set_sync_timer(ensure_rate_timer(State), BQ:needs_sync(BQS)). -set_sync_timer(State = #q{sync_timer_ref = undefined}, undefined) -> +set_sync_timer(State = #q{sync_timer_ref = undefined}, false) -> {State, hibernate}; -set_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) -> +set_sync_timer(State = #q{sync_timer_ref = undefined, + backing_queue = BQ}, true) -> {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, - maybe_run_queue_via_backing_queue, [self(), Fun]), - {State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}, 0}; -set_sync_timer(State = #q{sync_timer_ref = TRef}, undefined) -> + ?SYNC_INTERVAL, + rabbit_amqqueue, maybe_run_queue_via_backing_queue, + [self(), fun (BQS) -> BQ:sync(BQS) end]), + {State#q{sync_timer_ref = TRef}, 0}; +set_sync_timer(State = #q{sync_timer_ref = TRef}, false) -> {ok, cancel} = timer:cancel(TRef), - {State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}, - hibernate}; + {State#q{sync_timer_ref = undefined}, hibernate}; set_sync_timer(State, _Fun) -> {State, 0}. @@ -823,12 +822,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) -> - noreply(State); - -handle_info(timeout, State = #q{backing_queue_timeout_fun = Fun}) -> +handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - Fun, State#q{backing_queue_timeout_fun = undefined})); + fun (BQS) -> BQ:sync(BQS) end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 583cd4da27..d9c898201c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -113,11 +113,12 @@ behaviour_info(callbacks) -> %% queue. {ram_duration, 1}, - %% Can return 'undefined' or a thunk which will receive the - %% state, and must return the state, which will be invoked as - %% soon as the queue process can manage (either on an empty - %% mailbox, or when a timer fires). - {sync_callback, 1}, + %% Should 'sync' be called as soon as the queue process can + %% manage (either on an empty mailbox, or when a timer fires)? + {needs_sync, 1}, + + %% Called (eventually) after needs_sync returns 'true'. + {sync, 1}, %% Called immediately before the queue hibernates. {handle_pre_hibernate, 1}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index bee97651d6..722ea321b5 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, sync_callback/1, + set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -191,8 +191,11 @@ set_ram_duration_target(_DurationTarget, State) -> ram_duration(State) -> {0, State}. -sync_callback(_State) -> - undefined. +needs_sync(_State) -> + false. + +sync(State) -> + State. handle_pre_hibernate(State) -> State. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 35d2b19145..b5cf98450d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, publish/2, publish_delivered/3, set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3, - tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1, + tx_ack/3, tx_rollback/2, tx_commit/3, needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -685,8 +685,10 @@ ram_duration(State = #vqstate { egress_rate = Egress, ram_msg_count_prev = RamMsgCount, out_counter = 0, in_counter = 0 })}. -sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined; -sync_callback(_) -> fun tx_commit_index/1. +needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; +needs_sync(_) -> true. + +sync(State) -> tx_commit_index(State). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = |
