diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-23 18:13:16 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-23 18:13:16 +0100 |
| commit | d12aed0607ee8d49f5dcfab0f59226b676541f63 (patch) | |
| tree | aa7820a1f8f0735580c0d4ab144085059345cbce /src | |
| parent | 715b396ca692d3e57ff2a7445a7c126529026569 (diff) | |
| download | rabbitmq-server-git-d12aed0607ee8d49f5dcfab0f59226b676541f63.tar.gz | |
in queue tx coalescing is in. It works too - doubling the number of producers does not halve the tx commit rate for each producer. It does go down slightly, on each doubling, but appears more log like. Also, debugging shows that the coalescing really is working
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
3 files changed, 71 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d0a5f205a8..f421d6aa12 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,8 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4]). +-export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4, + tx_commit_vq_callback/1]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -326,6 +327,9 @@ tx_commit_msg_store_callback(QPid, Pubs, AckTags, From) -> gen_server2:pcast(QPid, 8, {tx_commit_msg_store_callback, Pubs, AckTags, From}). +tx_commit_vq_callback(QPid) -> + gen_server2:pcast(QPid, 8, tx_commit_vq_callback). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 434652a58b..1e37a98ffa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,6 +38,7 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(SYNC_INTERVAL, 5). %% milliseconds -export([start_link/1]). @@ -56,7 +57,8 @@ variable_queue_state, next_msg_id, active_consumers, - blocked_consumers + blocked_consumers, + sync_timer_ref }). -record(consumer, {tag, ack_required}). @@ -109,7 +111,8 @@ init(Q = #amqqueue { name = QName }) -> variable_queue_state = VQS, next_msg_id = 1, active_consumers = queue:new(), - blocked_consumers = queue:new() + blocked_consumers = queue:new(), + sync_timer_ref = undefined }, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -142,11 +145,34 @@ code_change(_OldVsn, State, _Extra) -> reply(Reply, NewState) -> assert_invariant(NewState), - {reply, Reply, NewState, hibernate}. + {NewState1, Timeout} = next_state(NewState), + {reply, Reply, NewState1, Timeout}. noreply(NewState) -> assert_invariant(NewState), - {noreply, NewState, hibernate}. + {NewState1, Timeout} = next_state(NewState), + {noreply, NewState1, Timeout}. + +next_state(State = #q { variable_queue_state = VQS }) -> + next_state1(State, rabbit_variable_queue:needs_sync(VQS)). + +next_state1(State = #q { sync_timer_ref = undefined }, true) -> + {start_sync_timer(State), 0}; +next_state1(State, true) -> + {State, 0}; +next_state1(State = #q { sync_timer_ref = undefined }, false) -> + {State, hibernate}; +next_state1(State, false) -> + {stop_sync_timer(State), 0}. + +start_sync_timer(State = #q { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue, + tx_commit_vq_callback, [self()]), + State #q { sync_timer_ref = TRef }. + +stop_sync_timer(State = #q { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #q { sync_timer_ref = undefined }. assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) -> true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)). @@ -791,10 +817,15 @@ handle_cast({notify_sent, ChPid}, State) -> handle_cast({tx_commit_msg_store_callback, Pubs, AckTags, From}, State = #q{variable_queue_state = VQS}) -> noreply( + State#q{variable_queue_state = + rabbit_variable_queue:tx_commit_from_msg_store( + Pubs, AckTags, From, VQS)}); + +handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) -> + noreply( run_message_queue( State#q{variable_queue_state = - rabbit_variable_queue:tx_commit_from_msg_store( - Pubs, AckTags, From, VQS)})); + rabbit_variable_queue:tx_commit_from_vq(VQS)})); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -828,6 +859,12 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); +handle_info(timeout, State = #q{variable_queue_state = VQS}) -> + noreply( + run_message_queue( + State#q{variable_queue_state = + rabbit_variable_queue:tx_commit_from_vq(VQS)})); + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d9520d0080..33e09c113d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,8 @@ -export([init/1, terminate/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, - requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4]). + requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, + tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1]). %%---------------------------------------------------------------------------- @@ -54,7 +55,8 @@ avg_egress_rate, egress_rate_timestamp, prefetcher, - len + len, + on_sync }). -record(alpha, @@ -136,7 +138,8 @@ init(QueueName) -> avg_egress_rate = 0, egress_rate_timestamp = now(), prefetcher = undefined, - len = GammaCount + len = GammaCount, + on_sync = {[], [], []} }, maybe_load_next_segment(State). @@ -378,10 +381,16 @@ tx_commit(Pubs, AckTags, From, State) -> {false, State} end. -tx_commit_from_msg_store(Pubs, AckTags, From, State) -> +tx_commit_from_msg_store(Pubs, AckTags, From, + State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> DiskAcks = lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags), - State1 = ack(DiskAcks, State), + State #vqstate { on_sync = { [DiskAcks | SAcks], + [Pubs | SPubs], + [From | SFroms] }}. + +tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> + State1 = ack(lists:flatten(SAcks), State), {PubSeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -392,11 +401,16 @@ tx_commit_from_msg_store(Pubs, AckTags, From, State) -> false -> SeqIdsAcc end, {SeqIdsAcc1, StateN1} - end, {[], State1}, Pubs), + end, {[], State1}, lists:flatten(lists:reverse(SPubs))), IndexState1 = - rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= DiskAcks, IndexState), - gen_server2:reply(From, ok), - State2 #vqstate { index_state = IndexState1 }. + rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= SAcks, IndexState), + [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], + State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. + +needs_sync(#vqstate { on_sync = {_, _, []} }) -> + false; +needs_sync(_) -> + true. %%---------------------------------------------------------------------------- |
