summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-23 18:13:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-23 18:13:16 +0100
commitd12aed0607ee8d49f5dcfab0f59226b676541f63 (patch)
treeaa7820a1f8f0735580c0d4ab144085059345cbce /src
parent715b396ca692d3e57ff2a7445a7c126529026569 (diff)
downloadrabbitmq-server-git-d12aed0607ee8d49f5dcfab0f59226b676541f63.tar.gz
in queue tx coalescing is in. It works too - doubling the number of producers does not halve the tx commit rate for each producer. It does go down slightly, on each doubling, but appears more log like. Also, debugging shows that the coalescing really is working
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl49
-rw-r--r--src/rabbit_variable_queue.erl32
3 files changed, 71 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d0a5f205a8..f421d6aa12 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,8 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4]).
+-export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4,
+ tx_commit_vq_callback/1]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -326,6 +327,9 @@ tx_commit_msg_store_callback(QPid, Pubs, AckTags, From) ->
gen_server2:pcast(QPid, 8,
{tx_commit_msg_store_callback, Pubs, AckTags, From}).
+tx_commit_vq_callback(QPid) ->
+ gen_server2:pcast(QPid, 8, tx_commit_vq_callback).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 434652a58b..1e37a98ffa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,6 +38,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(SYNC_INTERVAL, 5). %% milliseconds
-export([start_link/1]).
@@ -56,7 +57,8 @@
variable_queue_state,
next_msg_id,
active_consumers,
- blocked_consumers
+ blocked_consumers,
+ sync_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -109,7 +111,8 @@ init(Q = #amqqueue { name = QName }) ->
variable_queue_state = VQS,
next_msg_id = 1,
active_consumers = queue:new(),
- blocked_consumers = queue:new()
+ blocked_consumers = queue:new(),
+ sync_timer_ref = undefined
},
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -142,11 +145,34 @@ code_change(_OldVsn, State, _Extra) ->
reply(Reply, NewState) ->
assert_invariant(NewState),
- {reply, Reply, NewState, hibernate}.
+ {NewState1, Timeout} = next_state(NewState),
+ {reply, Reply, NewState1, Timeout}.
noreply(NewState) ->
assert_invariant(NewState),
- {noreply, NewState, hibernate}.
+ {NewState1, Timeout} = next_state(NewState),
+ {noreply, NewState1, Timeout}.
+
+next_state(State = #q { variable_queue_state = VQS }) ->
+ next_state1(State, rabbit_variable_queue:needs_sync(VQS)).
+
+next_state1(State = #q { sync_timer_ref = undefined }, true) ->
+ {start_sync_timer(State), 0};
+next_state1(State, true) ->
+ {State, 0};
+next_state1(State = #q { sync_timer_ref = undefined }, false) ->
+ {State, hibernate};
+next_state1(State, false) ->
+ {stop_sync_timer(State), 0}.
+
+start_sync_timer(State = #q { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue,
+ tx_commit_vq_callback, [self()]),
+ State #q { sync_timer_ref = TRef }.
+
+stop_sync_timer(State = #q { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #q { sync_timer_ref = undefined }.
assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) ->
true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)).
@@ -791,10 +817,15 @@ handle_cast({notify_sent, ChPid}, State) ->
handle_cast({tx_commit_msg_store_callback, Pubs, AckTags, From},
State = #q{variable_queue_state = VQS}) ->
noreply(
+ State#q{variable_queue_state =
+ rabbit_variable_queue:tx_commit_from_msg_store(
+ Pubs, AckTags, From, VQS)});
+
+handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) ->
+ noreply(
run_message_queue(
State#q{variable_queue_state =
- rabbit_variable_queue:tx_commit_from_msg_store(
- Pubs, AckTags, From, VQS)}));
+ rabbit_variable_queue:tx_commit_from_vq(VQS)}));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -828,6 +859,12 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
+handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
+ noreply(
+ run_message_queue(
+ State#q{variable_queue_state =
+ rabbit_variable_queue:tx_commit_from_vq(VQS)}));
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d9520d0080..33e09c113d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,8 @@
-export([init/1, terminate/1, publish/2, publish_delivered/2,
set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
- requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4]).
+ requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
+ tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1]).
%%----------------------------------------------------------------------------
@@ -54,7 +55,8 @@
avg_egress_rate,
egress_rate_timestamp,
prefetcher,
- len
+ len,
+ on_sync
}).
-record(alpha,
@@ -136,7 +138,8 @@ init(QueueName) ->
avg_egress_rate = 0,
egress_rate_timestamp = now(),
prefetcher = undefined,
- len = GammaCount
+ len = GammaCount,
+ on_sync = {[], [], []}
},
maybe_load_next_segment(State).
@@ -378,10 +381,16 @@ tx_commit(Pubs, AckTags, From, State) ->
{false, State}
end.
-tx_commit_from_msg_store(Pubs, AckTags, From, State) ->
+tx_commit_from_msg_store(Pubs, AckTags, From,
+ State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) ->
DiskAcks =
lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags),
- State1 = ack(DiskAcks, State),
+ State #vqstate { on_sync = { [DiskAcks | SAcks],
+ [Pubs | SPubs],
+ [From | SFroms] }}.
+
+tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) ->
+ State1 = ack(lists:flatten(SAcks), State),
{PubSeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
@@ -392,11 +401,16 @@ tx_commit_from_msg_store(Pubs, AckTags, From, State) ->
false -> SeqIdsAcc
end,
{SeqIdsAcc1, StateN1}
- end, {[], State1}, Pubs),
+ end, {[], State1}, lists:flatten(lists:reverse(SPubs))),
IndexState1 =
- rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= DiskAcks, IndexState),
- gen_server2:reply(From, ok),
- State2 #vqstate { index_state = IndexState1 }.
+ rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= SAcks, IndexState),
+ [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
+ State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
+
+needs_sync(#vqstate { on_sync = {_, _, []} }) ->
+ false;
+needs_sync(_) ->
+ true.
%%----------------------------------------------------------------------------