summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-02 18:11:18 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-02 18:11:18 +0000
commitb10788b74928824690cddee5cfc8da930c7282ef (patch)
tree07e4591664a25d2a50a13179f6cdc7df326ab2d6 /src
parentbf1029db49ed642f270c78f0420e6aec691f1d95 (diff)
downloadrabbitmq-server-git-b10788b74928824690cddee5cfc8da930c7282ef.tar.gz
started work on properly testing the vq. Caught several bugs already. Also hooked in the remeasuring of the egress rate of the variable queue, which will also eventually form the driver to inform the memory_manager. eventually.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl49
-rw-r--r--src/rabbit_tests.erl46
-rw-r--r--src/rabbit_variable_queue.erl25
4 files changed, 111 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 833dada4fb..82a0f5b4fa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,7 +33,7 @@
-export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3,
purge/1]).
--export([internal_declare/2, internal_delete/1]).
+-export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
@@ -108,10 +108,12 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(tx_commit_msg_store_callback/4 :: (pid(), [message()], [acktag()],
- {pid(), any()}) -> 'ok').
+-spec(tx_commit_msg_store_callback/4 ::
+ (pid(), [message()], [acktag()], {pid(), any()}) -> 'ok').
+-spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
+-spec(remeasure_egress_rate/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -370,6 +372,9 @@ internal_delete(QueueName) ->
end
end).
+remeasure_egress_rate(QPid) ->
+ gen_server2:pcast(QPid, 8, remeasure_egress_rate).
+
prune_queue_childspecs() ->
lists:foreach(
fun ({Name, undefined, _Type, _Mods}) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9d27fd0f32..cd70979a1f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -35,10 +35,11 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(EGRESS_REMEASURE_INTERVAL, 5000).
-export([start_link/1]).
@@ -58,7 +59,8 @@
next_msg_id,
active_consumers,
blocked_consumers,
- sync_timer_ref
+ sync_timer_ref,
+ egress_rate_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -112,7 +114,8 @@ init(Q = #amqqueue { name = QName }) ->
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
- sync_timer_ref = undefined
+ sync_timer_ref = undefined,
+ egress_rate_timer_ref = undefined
},
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -151,7 +154,8 @@ noreply(NewState) ->
{noreply, NewState1, Timeout}.
next_state(State = #q{variable_queue_state = VQS}) ->
- next_state1(State, rabbit_variable_queue:needs_sync(VQS)).
+ next_state1(ensure_egress_rate_timer(State),
+ rabbit_variable_queue:needs_sync(VQS)).
next_state1(State = #q{sync_timer_ref = undefined}, true) ->
{start_sync_timer(State), 0};
@@ -160,12 +164,29 @@ next_state1(State, true) ->
next_state1(State = #q{sync_timer_ref = undefined,
variable_queue_state = VQS}, false) ->
{State, case rabbit_variable_queue:can_flush_journal(VQS) of
- true -> 0;
+ true -> 0;
false -> hibernate
end};
next_state1(State, false) ->
{stop_sync_timer(State), 0}.
+ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?EGRESS_REMEASURE_INTERVAL, rabbit_amqqueue,
+ remeasure_egress_rate, [self()]),
+ State#q{egress_rate_timer_ref = TRef};
+ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = just_measured}) ->
+ State#q{egress_rate_timer_ref = undefined};
+ensure_egress_rate_timer(State) ->
+ State.
+
+stop_egress_rate_timer(State = #q{egress_rate_timer_ref = undefined}) ->
+ State;
+stop_egress_rate_timer(State = #q{egress_rate_timer_ref = just_measured}) ->
+ State#q{egress_rate_timer_ref = undefined};
+stop_egress_rate_timer(State = #q{egress_rate_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{egress_rate_timer_ref = undefined}.
+
start_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue,
tx_commit_vq_callback, [self()]),
@@ -848,7 +869,12 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast(remeasure_egress_rate, State = #q{variable_queue_state = VQS}) ->
+ noreply(State#q{egress_rate_timer_ref = just_measured,
+ variable_queue_state =
+ rabbit_variable_queue:remeasure_egress_rate(VQS)}).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -886,6 +912,7 @@ handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-handle_pre_hibernate(State = #q { variable_queue_state = VQS }) ->
+handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) ->
VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS),
- {hibernate, State #q { variable_queue_state = VQS1 }}.
+ {hibernate, stop_egress_rate_timer(
+ State#q{ variable_queue_state = VQS1 })}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c84de421dd..9b53334eb7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1110,3 +1110,49 @@ test_queue_index() ->
ok = rabbit_queue_index:start_msg_store([]),
ok = stop_msg_store(),
passed.
+
+variable_queue_publish(IsPersistent, Count, VQ) ->
+ lists:foldl(
+ fun (_N, {Acc, VQ1}) ->
+ {SeqId, VQ2} = rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ <<>>, <<>>, [], <<>>, rabbit_guid:guid(),
+ IsPersistent), VQ1),
+ {[SeqId | Acc], VQ2}
+ end, {[], VQ}, lists:seq(1, Count)).
+
+test_variable_queue() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ stop_msg_store(),
+ ok = empty_test_queue(),
+ VQ0 = rabbit_variable_queue:init(test_queue()),
+ S0 = rabbit_variable_queue:status(VQ0),
+ 0 = proplists:get_value(len, S0),
+ false = proplists:get_value(prefetching, S0),
+
+ VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0),
+ 0 = proplists:get_value(target_ram_msg_count,
+ rabbit_variable_queue:status(VQ1)),
+
+ {SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1),
+ S2 = rabbit_variable_queue:status(VQ2),
+ TwoSegments = 2*SegmentSize,
+ {gamma, SegmentSize, TwoSegments} = proplists:get_value(gamma, S2),
+ SegmentSize = proplists:get_value(q3, S2),
+ ThreeSegments = 3*SegmentSize,
+ ThreeSegments = proplists:get_value(len, S2),
+
+ VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
+ io:format("~p~n", [rabbit_variable_queue:status(VQ3)]),
+ {{Msg, false, AckTag, Len1} = Obj, VQ4} =
+ rabbit_variable_queue:fetch(VQ3),
+ io:format("~p~n", [Obj]),
+ timer:sleep(1000),
+ VQ5 = rabbit_variable_queue:remeasure_egress_rate(VQ4),
+ VQ6 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ5),
+ io:format("~p~n", [rabbit_variable_queue:status(VQ6)]),
+ {{Msg1, false, AckTag1, Len11} = Obj1, VQ7} =
+ rabbit_variable_queue:fetch(VQ6),
+ io:format("~p~n", [Obj1]),
+ io:format("~p~n", [rabbit_variable_queue:status(VQ7)]),
+ passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7851d8f6a6..af8a4775a4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -36,7 +36,7 @@
ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
- can_flush_journal/1, flush_journal/1]).
+ can_flush_journal/1, flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
@@ -189,7 +189,8 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
%% incorporates the last two values, and not the current value and
%% the last average. Averaging helps smooth out spikes.
Now = now(),
- EgressRate = OutCount / timer:now_diff(Now, Timestamp),
+ %% EgressRate is in seconds, and now_diff is in microseconds
+ EgressRate = 1000000 * OutCount / timer:now_diff(Now, Timestamp),
AvgEgressRate = (EgressRate + OldEgressRate) / 2,
State #vqstate { egress_rate = EgressRate,
avg_egress_rate = AvgEgressRate,
@@ -420,6 +421,21 @@ flush_journal(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =
rabbit_queue_index:flush_journal(IndexState) }.
+status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
+ len = Len, on_sync = {_, _, From},
+ target_ram_msg_count = TargetRamMsgCount,
+ ram_msg_count = RamMsgCount, prefetcher = Prefetcher }) ->
+ [ {q1, queue:len(Q1)},
+ {q2, queue:len(Q2)},
+ {gamma, Gamma},
+ {q3, queue:len(Q3)},
+ {q4, Q4},
+ {len, Len},
+ {outstanding_txns, length(From)},
+ {target_ram_msg_count, TargetRamMsgCount},
+ {ram_msg_count, RamMsgCount},
+ {prefetching, Prefetcher /= undefined} ].
+
%%----------------------------------------------------------------------------
persistent_msg_ids(Pubs) ->
@@ -895,8 +911,5 @@ combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B;
combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A;
combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow },
#gamma { seq_id = SeqIdHigh, count = CountHigh}) ->
- true = SeqIdLow + CountLow =< SeqIdHigh, %% ASSERTION
- %% note the above assertion does not say ==. This is because acks
- %% may mean that the counts are not straight multiples of
- %% segment_size.
+ true = SeqIdLow =< SeqIdHigh, %% ASSERTION
#gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}.