summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-09 16:59:49 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-09 16:59:49 +0100
commiteefe36bc371f32cb6cf824508e8260f65d94a2bf (patch)
tree4175bd0c04a0925b8ac9e83d8a9562f4bc746849
parent61bdae34fbe1a34f6f9afb7c5003866369d4ce6d (diff)
downloadrabbitmq-server-git-eefe36bc371f32cb6cf824508e8260f65d94a2bf.tar.gz
Converted the maybe_run_queue_via_backing_queue to take a thunk, and also vq:needs_sync => vq:sync_callback
-rw-r--r--include/rabbit_backing_queue_type_spec.hrl2
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_backing_queue_type.erl2
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_tests.erl13
-rw-r--r--src/rabbit_variable_queue.erl15
7 files changed, 34 insertions, 40 deletions
diff --git a/include/rabbit_backing_queue_type_spec.hrl b/include/rabbit_backing_queue_type_spec.hrl
index 5db43bb612..f0a81aad4d 100644
--- a/include/rabbit_backing_queue_type_spec.hrl
+++ b/include/rabbit_backing_queue_type_spec.hrl
@@ -51,6 +51,6 @@
(('undefined' | 'infinity' | number()), state()) -> state()).
-spec(update_ram_duration/1 :: (state()) -> state()).
-spec(ram_duration/1 :: (state()) -> number()).
--spec(needs_sync/1 :: (state()) -> ('undefined' | {atom(), [any()]})).
+-spec(sync_callback/1 :: (state()) -> ('undefined' | {atom(), [any()]})).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6e6b4c676f..7d4566025b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,7 +41,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/3,
+-export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/2,
flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -109,7 +109,8 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(maybe_run_queue_via_backing_queue/3 :: (pid(), atom(), [any()]) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue/2 ::
+ (pid(), (fun ((A) -> {boolean(), A}))) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
@@ -350,8 +351,8 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
-maybe_run_queue_via_backing_queue(QPid, Fun, Args) ->
- gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun, Args}).
+maybe_run_queue_via_backing_queue(QPid, Fun) ->
+ gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}).
flush_all(QPids, ChPid) ->
safe_pmap_ok(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a20cd6c3b1..a4d653e289 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -190,11 +190,12 @@ noreply(NewState) ->
next_state(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- next_state1(ensure_rate_timer(State), BQ:needs_sync(BQS)).
+ next_state1(ensure_rate_timer(State), BQ:sync_callback(BQS)).
-next_state1(State = #q{sync_timer_ref = undefined}, Callback = {_Fun, _Args}) ->
- {start_sync_timer(State, Callback), 0};
-next_state1(State, {_Fun, _Args}) ->
+next_state1(State = #q{sync_timer_ref = undefined}, Fun)
+ when Fun =/= undefined ->
+ {start_sync_timer(State, Fun), 0};
+next_state1(State, Fun) when Fun =/= undefined ->
{State, 0};
next_state1(State = #q{sync_timer_ref = undefined}, undefined) ->
{State, hibernate};
@@ -218,12 +219,12 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-start_sync_timer(State = #q{sync_timer_ref = undefined},
- Callback = {Fun, Args}) ->
+start_sync_timer(State = #q{sync_timer_ref = undefined}, Fun)
+ when Fun =/= undefined ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue,
- maybe_run_queue_via_backing_queue, [self(), Fun, Args]),
- State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Callback}.
+ maybe_run_queue_via_backing_queue, [self(), Fun]),
+ State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}.
stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
@@ -524,10 +525,8 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-maybe_run_queue_via_backing_queue(Fun, Args,
- State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
- {RunQueue, BQS1} = apply(BQ, Fun, Args ++ [BQS]),
+maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+ {RunQueue, BQS1} = Fun(BQS),
State1 = State#q{backing_queue_state = BQS1},
case RunQueue of
true -> run_message_queue(State1);
@@ -908,8 +907,8 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({maybe_run_queue_via_backing_queue, Fun, Args}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, Args, State));
+handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Fun, State));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -973,9 +972,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) ->
noreply(State);
-handle_info(timeout, State = #q{backing_queue_timeout_fun = {Fun, Args}}) ->
+handle_info(timeout, State = #q{backing_queue_timeout_fun = Fun}) ->
noreply(maybe_run_queue_via_backing_queue(
- Fun, Args, State#q{backing_queue_timeout_fun = undefined}));
+ Fun, State#q{backing_queue_timeout_fun = undefined}));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_backing_queue_type.erl b/src/rabbit_backing_queue_type.erl
index c484a7c255..3ccd71d09f 100644
--- a/src/rabbit_backing_queue_type.erl
+++ b/src/rabbit_backing_queue_type.erl
@@ -116,7 +116,7 @@ behaviour_info(callbacks) ->
%% arguments to be invoked in the internal queue module as soon
%% as the queue process can manage (either on an empty mailbox,
%% or when a timer fires).
- {needs_sync, 1},
+ {sync_callback, 1},
%% Called immediately before the queue hibernates
{handle_pre_hibernate, 1},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index aaef03afb2..a5583b8765 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -211,7 +211,7 @@
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(start_msg_stores/1 :: ([amqqueue()]) -> 'ok').
+-spec(start_msg_stores/1 :: ([queue_name()]) -> 'ok').
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 29699829dc..d374561f83 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1196,13 +1196,6 @@ queue_name(Name) ->
test_queue() ->
queue_name(test).
-test_amqqueue(Durable) ->
- #amqqueue{name = test_queue(),
- durable = Durable,
- auto_delete = true,
- arguments = [],
- pid = none}.
-
empty_test_queue() ->
ok = rabbit_queue_index:start_msg_stores([]),
{0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false),
@@ -1266,7 +1259,7 @@ test_queue_index() ->
%% call terminate twice to prove it's idempotent
_Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]),
+ ok = rabbit_queue_index:start_msg_stores([test_queue()]),
%% should get length back as 0, as all the msgs were transient
{0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false),
{0, 0, Qi7} =
@@ -1279,7 +1272,7 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]),
+ ok = rabbit_queue_index:start_msg_stores([test_queue()]),
%% should get length back as 10000
LenB = length(SeqIdsB),
{LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false),
@@ -1296,7 +1289,7 @@ test_queue_index() ->
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate([], Qi18),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]),
+ ok = rabbit_queue_index:start_msg_stores([test_queue()]),
%% should get length back as 0 because all persistent msgs have been acked
{0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7ee88deaf8..0048925ac0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -35,7 +35,7 @@
set_ram_duration_target/2, update_ram_duration/1,
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
- tx_commit/4, needs_sync/1, handle_pre_hibernate/1, status/1]).
+ tx_commit/4, sync_callback/1, handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -605,8 +605,11 @@ tx_commit(Pubs, AckTags, From, State =
ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentMsgIds,
fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, tx_commit_post_msg_store,
- [IsTransientPubs, Pubs, AckTags, From])
+ Self,
+ fun (StateN) -> tx_commit_post_msg_store(
+ IsTransientPubs, Pubs,
+ AckTags, From, StateN)
+ end)
end),
{false, State}
end.
@@ -660,10 +663,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
{Pubs /= [],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}.
-needs_sync(#vqstate { on_sync = {_, _, []} }) ->
- undefined;
-needs_sync(_) ->
- {tx_commit_index, []}.
+sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined;
+sync_callback(_) -> fun tx_commit_index/1.
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =