diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-09 16:59:49 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-09 16:59:49 +0100 |
| commit | eefe36bc371f32cb6cf824508e8260f65d94a2bf (patch) | |
| tree | 4175bd0c04a0925b8ac9e83d8a9562f4bc746849 | |
| parent | 61bdae34fbe1a34f6f9afb7c5003866369d4ce6d (diff) | |
| download | rabbitmq-server-git-eefe36bc371f32cb6cf824508e8260f65d94a2bf.tar.gz | |
Converted the maybe_run_queue_via_backing_queue to take a thunk, and also vq:needs_sync => vq:sync_callback
| -rw-r--r-- | include/rabbit_backing_queue_type_spec.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_type.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
7 files changed, 34 insertions, 40 deletions
diff --git a/include/rabbit_backing_queue_type_spec.hrl b/include/rabbit_backing_queue_type_spec.hrl index 5db43bb612..f0a81aad4d 100644 --- a/include/rabbit_backing_queue_type_spec.hrl +++ b/include/rabbit_backing_queue_type_spec.hrl @@ -51,6 +51,6 @@ (('undefined' | 'infinity' | number()), state()) -> state()). -spec(update_ram_duration/1 :: (state()) -> state()). -spec(ram_duration/1 :: (state()) -> number()). --spec(needs_sync/1 :: (state()) -> ('undefined' | {atom(), [any()]})). +-spec(sync_callback/1 :: (state()) -> ('undefined' | {atom(), [any()]})). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6e6b4c676f..7d4566025b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,7 +41,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/3, +-export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/2, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -109,7 +109,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(maybe_run_queue_via_backing_queue/3 :: (pid(), atom(), [any()]) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue/2 :: + (pid(), (fun ((A) -> {boolean(), A}))) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). @@ -350,8 +351,8 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). -maybe_run_queue_via_backing_queue(QPid, Fun, Args) -> - gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun, Args}). +maybe_run_queue_via_backing_queue(QPid, Fun) -> + gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}). flush_all(QPids, ChPid) -> safe_pmap_ok( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a20cd6c3b1..a4d653e289 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -190,11 +190,12 @@ noreply(NewState) -> next_state(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - next_state1(ensure_rate_timer(State), BQ:needs_sync(BQS)). + next_state1(ensure_rate_timer(State), BQ:sync_callback(BQS)). -next_state1(State = #q{sync_timer_ref = undefined}, Callback = {_Fun, _Args}) -> - {start_sync_timer(State, Callback), 0}; -next_state1(State, {_Fun, _Args}) -> +next_state1(State = #q{sync_timer_ref = undefined}, Fun) + when Fun =/= undefined -> + {start_sync_timer(State, Fun), 0}; +next_state1(State, Fun) when Fun =/= undefined -> {State, 0}; next_state1(State = #q{sync_timer_ref = undefined}, undefined) -> {State, hibernate}; @@ -218,12 +219,12 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. -start_sync_timer(State = #q{sync_timer_ref = undefined}, - Callback = {Fun, Args}) -> +start_sync_timer(State = #q{sync_timer_ref = undefined}, Fun) + when Fun =/= undefined -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, - maybe_run_queue_via_backing_queue, [self(), Fun, Args]), - State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Callback}. + maybe_run_queue_via_backing_queue, [self(), Fun]), + State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Fun}. stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), @@ -524,10 +525,8 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -maybe_run_queue_via_backing_queue(Fun, Args, - State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - {RunQueue, BQS1} = apply(BQ, Fun, Args ++ [BQS]), +maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> + {RunQueue, BQS1} = Fun(BQS), State1 = State#q{backing_queue_state = BQS1}, case RunQueue of true -> run_message_queue(State1); @@ -908,8 +907,8 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({maybe_run_queue_via_backing_queue, Fun, Args}, State) -> - noreply(maybe_run_queue_via_backing_queue(Fun, Args, State)); +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -973,9 +972,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) -> noreply(State); -handle_info(timeout, State = #q{backing_queue_timeout_fun = {Fun, Args}}) -> +handle_info(timeout, State = #q{backing_queue_timeout_fun = Fun}) -> noreply(maybe_run_queue_via_backing_queue( - Fun, Args, State#q{backing_queue_timeout_fun = undefined})); + Fun, State#q{backing_queue_timeout_fun = undefined})); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue_type.erl b/src/rabbit_backing_queue_type.erl index c484a7c255..3ccd71d09f 100644 --- a/src/rabbit_backing_queue_type.erl +++ b/src/rabbit_backing_queue_type.erl @@ -116,7 +116,7 @@ behaviour_info(callbacks) -> %% arguments to be invoked in the internal queue module as soon %% as the queue process can manage (either on an empty mailbox, %% or when a timer fires). - {needs_sync, 1}, + {sync_callback, 1}, %% Called immediately before the queue hibernates {handle_pre_hibernate, 1}, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index aaef03afb2..a5583b8765 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -211,7 +211,7 @@ -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(start_msg_stores/1 :: ([amqqueue()]) -> 'ok'). +-spec(start_msg_stores/1 :: ([queue_name()]) -> 'ok'). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 29699829dc..d374561f83 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1196,13 +1196,6 @@ queue_name(Name) -> test_queue() -> queue_name(test). -test_amqqueue(Durable) -> - #amqqueue{name = test_queue(), - durable = Durable, - auto_delete = true, - arguments = [], - pid = none}. - empty_test_queue() -> ok = rabbit_queue_index:start_msg_stores([]), {0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false), @@ -1266,7 +1259,7 @@ test_queue_index() -> %% call terminate twice to prove it's idempotent _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]), + ok = rabbit_queue_index:start_msg_stores([test_queue()]), %% should get length back as 0, as all the msgs were transient {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false), {0, 0, Qi7} = @@ -1279,7 +1272,7 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]), + ok = rabbit_queue_index:start_msg_stores([test_queue()]), %% should get length back as 10000 LenB = length(SeqIdsB), {LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false), @@ -1296,7 +1289,7 @@ test_queue_index() -> rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate([], Qi18), ok = stop_msg_store(), - ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]), + ok = rabbit_queue_index:start_msg_stores([test_queue()]), %% should get length back as 0 because all persistent msgs have been acked {0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7ee88deaf8..0048925ac0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -35,7 +35,7 @@ set_ram_duration_target/2, update_ram_duration/1, ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, - tx_commit/4, needs_sync/1, handle_pre_hibernate/1, status/1]). + tx_commit/4, sync_callback/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -605,8 +605,11 @@ tx_commit(Pubs, AckTags, From, State = ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentMsgIds, fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, tx_commit_post_msg_store, - [IsTransientPubs, Pubs, AckTags, From]) + Self, + fun (StateN) -> tx_commit_post_msg_store( + IsTransientPubs, Pubs, + AckTags, From, StateN) + end) end), {false, State} end. @@ -660,10 +663,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, {Pubs /= [], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}. -needs_sync(#vqstate { on_sync = {_, _, []} }) -> - undefined; -needs_sync(_) -> - {tx_commit_index, []}. +sync_callback(#vqstate { on_sync = {_, _, []} }) -> undefined; +sync_callback(_) -> fun tx_commit_index/1. handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = |
