diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-11-03 13:49:50 +0200 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-11-03 13:49:50 +0200 |
| commit | 9257d5e7b53ed3bb65dee09d42e3f570e090b55a (patch) | |
| tree | 273897f05f06c93d264fdcfe1b7020c682c6498f | |
| parent | da2b0ccda34e85048b35d2633836d2ec708dd488 (diff) | |
| parent | e811e30fff3ff9170c8103d100e24b4c0469f5ff (diff) | |
| download | rabbitmq-server-git-9257d5e7b53ed3bb65dee09d42e3f570e090b55a.tar.gz | |
Merge branch 'master' into rabbitmq-server-995
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 27 | ||||
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 9 |
5 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b51375f9fb..a3c8f99519 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1516,9 +1516,9 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, credit_flow:handle_bump_msg(Msg), noreply(State#q{backing_queue_state = BQ:resume(BQS)}); handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - put(waiting_bump, false), - noreply(State#q{backing_queue_state = BQ:resume(BQS)}); + backing_queue_state = BQS0}) -> + BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0), + noreply(State#q{backing_queue_state = BQ:resume(BQS1)}); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index fefa0de1c9..8dd8a44c5b 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -24,7 +24,7 @@ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4]). + zip_msgs_and_acks/4, handle_info/2]). -export([start/2, stop/1, delete_crashed/1]). @@ -447,6 +447,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +handle_info(Msg, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }. + resume(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:resume(BQS) }. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 9f9ef5adca..7c7bd8fbe6 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -41,7 +41,7 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4]). + zip_msgs_and_acks/4, handle_info/2]). -record(state, {bq, bqss, max_priority}). -record(passthrough, {bq, bqs}). @@ -393,6 +393,9 @@ handle_pre_hibernate(State = #state{bq = BQ}) -> handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(handle_pre_hibernate(BQS)). +handle_info(Msg, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(handle_info(Msg, BQS)). + resume(State = #state{bq = BQ}) -> foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State); resume(State = #passthrough{bq = BQ, bqs = BQS}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 49f4d8d3ed..aed3c9d7c9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -26,7 +26,7 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4, multiple_routing_keys/0]). + zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]). -export([start/2, stop/1]). @@ -325,7 +325,8 @@ memory_reduction_run_count, %% Queue data is grouped by VHost. We need to store it %% to work with queue index. - virtual_host + virtual_host, + waiting_bump = false }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -911,6 +912,9 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) -> + State#vqstate{ waiting_bump = false }. + resume(State) -> a(reduce_memory_use(State)). msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, @@ -2466,21 +2470,16 @@ reduce_memory_use(State = #vqstate { Blocked = credit_flow:blocked(), case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of %% Credit bump will continue paging - {true, _} -> ok; + {true, _} -> State3; %% Finished with paging - {false, false} -> ok; + {false, false} -> State3; %% Planning next batch {false, true} -> %% We don't want to use self-credit-flow, because it's harder to %% reason about. So the process sends a (prioritised) message to %% itself and sets a waiting_bump value to keep the message box clean - case get(waiting_bump) of - true -> ok; - _ -> self() ! bump_reduce_memory_use, - put(waiting_bump, true) - end - end, - State3; + maybe_bump_reduce_memory_use(State3) + end; %% When using lazy queues, there are no alphas, so we don't need to %% call push_alphas_to_betas/2. reduce_memory_use(State = #vqstate { @@ -2506,6 +2505,12 @@ reduce_memory_use(State = #vqstate { garbage_collect(), State3. +maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) -> + State; +maybe_bump_reduce_memory_use(State) -> + self() ! bump_reduce_memory_use, + State#vqstate{ waiting_bump = true }. + limit_ram_acks(0, State) -> {0, ui(State)}; limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 247477bf40..59de0cb5c7 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -26,8 +26,7 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4]). --export([start/2, stop/1]). + start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]). %%---------------------------------------------------------------------------- %% This test backing queue follows the variable queue implementation, with @@ -91,7 +90,8 @@ memory_reduction_run_count, %% Queue data is grouped by VHost. We need to store it %% to work with queue index. - virtual_host + virtual_host, + waiting_bump = false }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -285,6 +285,9 @@ timeout(State) -> handle_pre_hibernate(State) -> rabbit_variable_queue:handle_pre_hibernate(State). +handle_info(Msg, State) -> + rabbit_variable_queue:handle_info(Msg, State). + resume(State) -> rabbit_variable_queue:resume(State). msg_rates(State) -> |
