diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 25 |
4 files changed, 111 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 833dada4fb..82a0f5b4fa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,7 +33,7 @@ -export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1]). +-export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). @@ -108,10 +108,12 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(tx_commit_msg_store_callback/4 :: (pid(), [message()], [acktag()], - {pid(), any()}) -> 'ok'). +-spec(tx_commit_msg_store_callback/4 :: + (pid(), [message()], [acktag()], {pid(), any()}) -> 'ok'). +-spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). +-spec(remeasure_egress_rate/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -370,6 +372,9 @@ internal_delete(QueueName) -> end end). +remeasure_egress_rate(QPid) -> + gen_server2:pcast(QPid, 8, remeasure_egress_rate). + prune_queue_childspecs() -> lists:foreach( fun ({Name, undefined, _Type, _Mods}) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9d27fd0f32..cd70979a1f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -35,10 +35,11 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(UNSENT_MESSAGE_LIMIT, 100). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(EGRESS_REMEASURE_INTERVAL, 5000). -export([start_link/1]). @@ -58,7 +59,8 @@ next_msg_id, active_consumers, blocked_consumers, - sync_timer_ref + sync_timer_ref, + egress_rate_timer_ref }). -record(consumer, {tag, ack_required}). @@ -112,7 +114,8 @@ init(Q = #amqqueue { name = QName }) -> next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), - sync_timer_ref = undefined + sync_timer_ref = undefined, + egress_rate_timer_ref = undefined }, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -151,7 +154,8 @@ noreply(NewState) -> {noreply, NewState1, Timeout}. next_state(State = #q{variable_queue_state = VQS}) -> - next_state1(State, rabbit_variable_queue:needs_sync(VQS)). + next_state1(ensure_egress_rate_timer(State), + rabbit_variable_queue:needs_sync(VQS)). next_state1(State = #q{sync_timer_ref = undefined}, true) -> {start_sync_timer(State), 0}; @@ -160,12 +164,29 @@ next_state1(State, true) -> next_state1(State = #q{sync_timer_ref = undefined, variable_queue_state = VQS}, false) -> {State, case rabbit_variable_queue:can_flush_journal(VQS) of - true -> 0; + true -> 0; false -> hibernate end}; next_state1(State, false) -> {stop_sync_timer(State), 0}. +ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after(?EGRESS_REMEASURE_INTERVAL, rabbit_amqqueue, + remeasure_egress_rate, [self()]), + State#q{egress_rate_timer_ref = TRef}; +ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = just_measured}) -> + State#q{egress_rate_timer_ref = undefined}; +ensure_egress_rate_timer(State) -> + State. + +stop_egress_rate_timer(State = #q{egress_rate_timer_ref = undefined}) -> + State; +stop_egress_rate_timer(State = #q{egress_rate_timer_ref = just_measured}) -> + State#q{egress_rate_timer_ref = undefined}; +stop_egress_rate_timer(State = #q{egress_rate_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{egress_rate_timer_ref = undefined}. + start_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue, tx_commit_vq_callback, [self()]), @@ -848,7 +869,12 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast(remeasure_egress_rate, State = #q{variable_queue_state = VQS}) -> + noreply(State#q{egress_rate_timer_ref = just_measured, + variable_queue_state = + rabbit_variable_queue:remeasure_egress_rate(VQS)}). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -886,6 +912,7 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -handle_pre_hibernate(State = #q { variable_queue_state = VQS }) -> +handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) -> VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS), - {hibernate, State #q { variable_queue_state = VQS1 }}. + {hibernate, stop_egress_rate_timer( + State#q{ variable_queue_state = VQS1 })}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c84de421dd..9b53334eb7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1110,3 +1110,49 @@ test_queue_index() -> ok = rabbit_queue_index:start_msg_store([]), ok = stop_msg_store(), passed. + +variable_queue_publish(IsPersistent, Count, VQ) -> + lists:foldl( + fun (_N, {Acc, VQ1}) -> + {SeqId, VQ2} = rabbit_variable_queue:publish( + rabbit_basic:message( + <<>>, <<>>, [], <<>>, rabbit_guid:guid(), + IsPersistent), VQ1), + {[SeqId | Acc], VQ2} + end, {[], VQ}, lists:seq(1, Count)). + +test_variable_queue() -> + SegmentSize = rabbit_queue_index:segment_size(), + stop_msg_store(), + ok = empty_test_queue(), + VQ0 = rabbit_variable_queue:init(test_queue()), + S0 = rabbit_variable_queue:status(VQ0), + 0 = proplists:get_value(len, S0), + false = proplists:get_value(prefetching, S0), + + VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0), + 0 = proplists:get_value(target_ram_msg_count, + rabbit_variable_queue:status(VQ1)), + + {SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1), + S2 = rabbit_variable_queue:status(VQ2), + TwoSegments = 2*SegmentSize, + {gamma, SegmentSize, TwoSegments} = proplists:get_value(gamma, S2), + SegmentSize = proplists:get_value(q3, S2), + ThreeSegments = 3*SegmentSize, + ThreeSegments = proplists:get_value(len, S2), + + VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), + io:format("~p~n", [rabbit_variable_queue:status(VQ3)]), + {{Msg, false, AckTag, Len1} = Obj, VQ4} = + rabbit_variable_queue:fetch(VQ3), + io:format("~p~n", [Obj]), + timer:sleep(1000), + VQ5 = rabbit_variable_queue:remeasure_egress_rate(VQ4), + VQ6 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ5), + io:format("~p~n", [rabbit_variable_queue:status(VQ6)]), + {{Msg1, false, AckTag1, Len11} = Obj1, VQ7} = + rabbit_variable_queue:fetch(VQ6), + io:format("~p~n", [Obj1]), + io:format("~p~n", [rabbit_variable_queue:status(VQ7)]), + passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7851d8f6a6..af8a4775a4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -36,7 +36,7 @@ ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, - can_flush_journal/1, flush_journal/1]). + can_flush_journal/1, flush_journal/1, status/1]). %%---------------------------------------------------------------------------- @@ -189,7 +189,8 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, %% incorporates the last two values, and not the current value and %% the last average. Averaging helps smooth out spikes. Now = now(), - EgressRate = OutCount / timer:now_diff(Now, Timestamp), + %% EgressRate is in seconds, and now_diff is in microseconds + EgressRate = 1000000 * OutCount / timer:now_diff(Now, Timestamp), AvgEgressRate = (EgressRate + OldEgressRate) / 2, State #vqstate { egress_rate = EgressRate, avg_egress_rate = AvgEgressRate, @@ -420,6 +421,21 @@ flush_journal(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush_journal(IndexState) }. +status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, + len = Len, on_sync = {_, _, From}, + target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, prefetcher = Prefetcher }) -> + [ {q1, queue:len(Q1)}, + {q2, queue:len(Q2)}, + {gamma, Gamma}, + {q3, queue:len(Q3)}, + {q4, Q4}, + {len, Len}, + {outstanding_txns, length(From)}, + {target_ram_msg_count, TargetRamMsgCount}, + {ram_msg_count, RamMsgCount}, + {prefetching, Prefetcher /= undefined} ]. + %%---------------------------------------------------------------------------- persistent_msg_ids(Pubs) -> @@ -895,8 +911,5 @@ combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B; combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A; combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow }, #gamma { seq_id = SeqIdHigh, count = CountHigh}) -> - true = SeqIdLow + CountLow =< SeqIdHigh, %% ASSERTION - %% note the above assertion does not say ==. This is because acks - %% may mean that the counts are not straight multiples of - %% segment_size. + true = SeqIdLow =< SeqIdHigh, %% ASSERTION #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}. |
