diff options
| -rw-r--r-- | include/rabbit_backing_queue_type_spec.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_type.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 54 |
5 files changed, 35 insertions, 41 deletions
diff --git a/include/rabbit_backing_queue_type_spec.hrl b/include/rabbit_backing_queue_type_spec.hrl index ac47ccba3d..afb549187e 100644 --- a/include/rabbit_backing_queue_type_spec.hrl +++ b/include/rabbit_backing_queue_type_spec.hrl @@ -49,8 +49,7 @@ -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: (('undefined' | 'infinity' | number()), state()) -> state()). --spec(update_ram_duration/1 :: (state()) -> state()). --spec(ram_duration/1 :: (state()) -> number()). +-spec(ram_duration/1 :: (state()) -> {number(), state()}). -spec(sync_callback/1 :: (state()) -> ('undefined' | (fun ((A) -> {boolean(), A})))). -spec(handle_pre_hibernate/1 :: (state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a4d653e289..9697cc1347 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -932,8 +932,7 @@ handle_cast({flush, ChPid}, State) -> handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQS1 = BQ:update_ram_duration(BQS), - RamDuration = BQ:ram_duration(BQS1), + {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), diff --git a/src/rabbit_backing_queue_type.erl b/src/rabbit_backing_queue_type.erl index 526152f143..8e3cce147a 100644 --- a/src/rabbit_backing_queue_type.erl +++ b/src/rabbit_backing_queue_type.erl @@ -104,12 +104,10 @@ behaviour_info(callbacks) -> %% by the duration and the current queue rates. {set_ram_duration_target, 2}, - %% Recalculate the duration internally (likely to be just update - %% your internal rates). - {update_ram_duration, 1}, - - %% Report how many seconds the messages in RAM represent given - %% the current rates of the queue. + %% Optionally recalculate the duration internally (likely to be + %% just update your internal rates), and report how many seconds + %% the messages in RAM represent given the current rates of the + %% queue. {ram_duration, 1}, %% Can return 'undefined' or a thunk which will receive the diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 66f2d3cc3f..b186538ba1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1384,7 +1384,7 @@ test_variable_queue_dynamic_duration_change() -> %% start by sending in a couple of segments worth Len1 = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len1, VQ0), - VQ2 = rabbit_variable_queue:update_ram_duration(VQ1), + {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), {ok, _TRef} = timer:send_after(1000, {duration, 60, fun (V) -> (V*0.75)-1 end}), VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2), @@ -1420,7 +1420,7 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> _ -> Fun end, {ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}), - VQ4 = rabbit_variable_queue:update_ram_duration(VQ3), + {_Duration, VQ4} = rabbit_variable_queue:ram_duration(VQ3), VQ5 = %% /37 otherwise the duration is just to high to stress things rabbit_variable_queue:set_ram_duration_target(N/37, VQ4), io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]), @@ -1434,7 +1434,7 @@ test_variable_queue_partial_segments_delta_thing() -> HalfSegment = SegmentSize div 2, VQ0 = fresh_variable_queue(), VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), - VQ2 = rabbit_variable_queue:update_ram_duration(VQ1), + {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 164533b716..bbf78bb7e9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,8 +32,8 @@ -module(rabbit_variable_queue). -export([init/2, terminate/1, publish/2, publish_delivered/2, - set_ram_duration_target/2, update_ram_duration/1, - ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, + set_ram_duration_target/2, ram_duration/1, + fetch/1, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, sync_callback/1, handle_pre_hibernate/1, status/1]). @@ -372,36 +372,34 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end. -update_ram_duration(State = #vqstate { egress_rate = Egress, - ingress_rate = Ingress, - rate_timestamp = Timestamp, - in_counter = InCount, - out_counter = OutCount, - ram_msg_count = RamMsgCount, - duration_target = DurationTarget }) -> +ram_duration(State = #vqstate { egress_rate = Egress, + ingress_rate = Ingress, + rate_timestamp = Timestamp, + in_counter = InCount, + out_counter = OutCount, + ram_msg_count = RamMsgCount, + duration_target = DurationTarget, + ram_msg_count_prev = RamMsgCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - set_ram_duration_target( - DurationTarget, - State #vqstate { egress_rate = Egress1, - avg_egress_rate = AvgEgressRate, - ingress_rate = Ingress1, - avg_ingress_rate = AvgIngressRate, - rate_timestamp = Now, - ram_msg_count_prev = RamMsgCount, - out_counter = 0, in_counter = 0 }). - -ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev }) -> - %% msgs / (msgs/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of - true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / (2 * (AvgEgressRate + AvgIngressRate)) - end. + Duration = %% msgs / (msgs/sec) == sec + case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + true -> infinity; + false -> (RamMsgCountPrev + RamMsgCount) / + (2 * (AvgEgressRate + AvgIngressRate)) + end, + + {Duration, set_ram_duration_target( + DurationTarget, + State #vqstate { egress_rate = Egress1, + avg_egress_rate = AvgEgressRate, + ingress_rate = Ingress1, + avg_ingress_rate = AvgIngressRate, + rate_timestamp = Now, + ram_msg_count_prev = RamMsgCount, + out_counter = 0, in_counter = 0 })}. fetch(State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, |
