summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-02 15:05:50 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-02 15:05:50 +0100
commit74979e5c28b2577b6f1cae2d9c49e70d1bba5bb9 (patch)
tree0c3ab5f5c3ecad68534d67837bba811e67359459
parent835f6480f5f0c00c60c056807e4ea0c29eda8159 (diff)
downloadrabbitmq-server-git-74979e5c28b2577b6f1cae2d9c49e70d1bba5bb9.tar.gz
restructure backing queue sync'ing
The code very much relied on the fact that sync_callback would always return the same fun when a sync was required. So it makes sense to capture that in the API by splitting sync_callback into a 'needs_sync' predicate and a separate callback handler. As a result we do not need the backing_queue_timeout_fun state member.
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_backing_queue.erl11
-rw-r--r--src/rabbit_invariable_queue.erl9
-rw-r--r--src/rabbit_variable_queue.erl8
5 files changed, 31 insertions, 29 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 0a0931ea34..1b536dfad1 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -57,7 +57,7 @@
-spec(set_ram_duration_target/2 ::
(('undefined' | 'infinity' | number()), state()) -> state()).
-spec(ram_duration/1 :: (state()) -> {number(), state()}).
--spec(sync_callback/1 :: (state()) ->
- ('undefined' | (fun ((A) -> {boolean(), A})))).
+-spec(needs_sync/1 :: (state()) -> boolean()).
+-spec(sync/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 809f0cc68b..ab88a3c294 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,7 +55,6 @@
has_had_consumers,
backing_queue,
backing_queue_state,
- backing_queue_timeout_fun,
active_consumers,
blocked_consumers,
sync_timer_ref,
@@ -110,7 +109,6 @@ init(Q) ->
has_had_consumers = false,
backing_queue = BQ,
backing_queue_state = undefined,
- backing_queue_timeout_fun = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
@@ -159,19 +157,20 @@ noreply(NewState) ->
{noreply, NewState1, Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- set_sync_timer(ensure_rate_timer(State), BQ:sync_callback(BQS)).
+ set_sync_timer(ensure_rate_timer(State), BQ:needs_sync(BQS)).
-set_sync_timer(State = #q{sync_timer_ref = undefined}, undefined) ->
+set_sync_timer(State = #q{sync_timer_ref = undefined}, false) ->
{State, hibernate};
-set_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) ->
+set_sync_timer(State = #q{sync_timer_ref = undefined,
+ backing_queue = BQ}, true) ->
{ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue,
- maybe_run_queue_via_backing_queue, [self(), Fun]),
- {State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}, 0};
-set_sync_timer(State = #q{sync_timer_ref = TRef}, undefined) ->
+ ?SYNC_INTERVAL,
+ rabbit_amqqueue, maybe_run_queue_via_backing_queue,
+ [self(), fun (BQS) -> BQ:sync(BQS) end]),
+ {State#q{sync_timer_ref = TRef}, 0};
+set_sync_timer(State = #q{sync_timer_ref = TRef}, false) ->
{ok, cancel} = timer:cancel(TRef),
- {State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined},
- hibernate};
+ {State#q{sync_timer_ref = undefined}, hibernate};
set_sync_timer(State, _Fun) ->
{State, 0}.
@@ -823,12 +822,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) ->
- noreply(State);
-
-handle_info(timeout, State = #q{backing_queue_timeout_fun = Fun}) ->
+handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- Fun, State#q{backing_queue_timeout_fun = undefined}));
+ fun (BQS) -> BQ:sync(BQS) end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 583cd4da27..d9c898201c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -113,11 +113,12 @@ behaviour_info(callbacks) ->
%% queue.
{ram_duration, 1},
- %% Can return 'undefined' or a thunk which will receive the
- %% state, and must return the state, which will be invoked as
- %% soon as the queue process can manage (either on an empty
- %% mailbox, or when a timer fires).
- {sync_callback, 1},
+ %% Should 'sync' be called as soon as the queue process can
+ %% manage (either on an empty mailbox, or when a timer fires)?
+ {needs_sync, 1},
+
+ %% Called (eventually) after needs_sync returns 'true'.
+ {sync, 1},
%% Called immediately before the queue hibernates.
{handle_pre_hibernate, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index bee97651d6..722ea321b5 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -34,7 +34,7 @@
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, sync_callback/1,
+ set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1,
handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -191,8 +191,11 @@ set_ram_duration_target(_DurationTarget, State) ->
ram_duration(State) ->
{0, State}.
-sync_callback(_State) ->
- undefined.
+needs_sync(_State) ->
+ false.
+
+sync(State) ->
+ State.
handle_pre_hibernate(State) ->
State.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 35d2b19145..b5cf98450d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/3, terminate/1, publish/2, publish_delivered/3,
set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1,
is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3,
- tx_ack/3, tx_rollback/2, tx_commit/3, sync_callback/1,
+ tx_ack/3, tx_rollback/2, tx_commit/3, needs_sync/1, sync/1,
handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -685,8 +685,10 @@ ram_duration(State = #vqstate { egress_rate = Egress,
ram_msg_count_prev = RamMsgCount,
out_counter = 0, in_counter = 0 })}.
-sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined;
-sync_callback(_) -> fun tx_commit_index/1.
+needs_sync(#vqstate { on_sync = {_, _, []} }) -> false;
+needs_sync(_) -> true.
+
+sync(State) -> tx_commit_index(State).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =