diff options
| -rw-r--r-- | src/rabbit_tests.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 13 |
2 files changed, 18 insertions, 8 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 540ea2f4ef..c684484d74 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1812,7 +1812,8 @@ test_variable_queue_partial_segments_delta_thing() -> VQ0 = fresh_variable_queue(), VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), - VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), + VQ3 = variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:set_ram_duration_target(0, VQ2)), %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), io:format("~p~n", [S3]), @@ -1821,7 +1822,8 @@ test_variable_queue_partial_segments_delta_thing() -> assert_prop(S3, q3, SegmentSize), assert_prop(S3, len, SegmentSize + HalfSegment), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), - VQ5 = variable_queue_publish(true, 1, VQ4), + VQ5 = variable_queue_wait_for_shuffling_end( + variable_queue_publish(true, 1, VQ4)), %% should have 1 alpha, but it's in the same segment as the deltas S5 = rabbit_variable_queue:status(VQ5), io:format("~p~n", [S5]), @@ -1848,6 +1850,13 @@ test_variable_queue_partial_segments_delta_thing() -> passed. +variable_queue_wait_for_shuffling_end(VQ) -> + case rabbit_variable_queue:needs_idle_timeout(VQ) of + true -> variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:idle_timeout(VQ)); + false -> VQ + end. + test_queue_recover() -> Count = 2*rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d5d48e58b9..1a9301c0af 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -627,12 +627,14 @@ ram_duration(State = #vqstate { egress_rate = Egress, out_counter = 0, ram_msg_count_prev = RamMsgCount })}. -needs_idle_timeout(State = #vqstate { on_sync = {_, _, []}, - ram_index_count = RamIndexCount }) -> +needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns}, + target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount }) + when SFuns =/= [] orelse RamMsgCount > TargetRamMsgCount -> + true; +needs_idle_timeout(State = #vqstate { ram_index_count = RamIndexCount }) -> Permitted = permitted_ram_index_count(State), - Permitted =:= infinity orelse RamIndexCount =< Permitted; -needs_idle_timeout(_) -> - true. + Permitted =/= infinity andalso RamIndexCount > Permitted. idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). @@ -669,7 +671,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, persistent_count = PersistentCount, - target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }) -> E1 = queue:is_empty(Q1), |
