summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-11-03 13:49:50 +0200
committerMichael Klishin <michael@clojurewerkz.org>2017-11-03 13:49:50 +0200
commit9257d5e7b53ed3bb65dee09d42e3f570e090b55a (patch)
tree273897f05f06c93d264fdcfe1b7020c682c6498f
parentda2b0ccda34e85048b35d2633836d2ec708dd488 (diff)
parente811e30fff3ff9170c8103d100e24b4c0469f5ff (diff)
downloadrabbitmq-server-git-9257d5e7b53ed3bb65dee09d42e3f570e090b55a.tar.gz
Merge branch 'master' into rabbitmq-server-995
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_priority_queue.erl5
-rw-r--r--src/rabbit_variable_queue.erl27
-rw-r--r--test/channel_operation_timeout_test_queue.erl9
5 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b51375f9fb..a3c8f99519 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1516,9 +1516,9 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- put(waiting_bump, false),
- noreply(State#q{backing_queue_state = BQ:resume(BQS)});
+ backing_queue_state = BQS0}) ->
+ BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0),
+ noreply(State#q{backing_queue_state = BQ:resume(BQS1)});
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index fefa0de1c9..8dd8a44c5b 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -24,7 +24,7 @@
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- zip_msgs_and_acks/4]).
+ zip_msgs_and_acks/4, handle_info/2]).
-export([start/2, stop/1, delete_crashed/1]).
@@ -447,6 +447,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+handle_info(Msg, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }.
+
resume(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:resume(BQS) }.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 9f9ef5adca..7c7bd8fbe6 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -41,7 +41,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- zip_msgs_and_acks/4]).
+ zip_msgs_and_acks/4, handle_info/2]).
-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
@@ -393,6 +393,9 @@ handle_pre_hibernate(State = #state{bq = BQ}) ->
handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(handle_pre_hibernate(BQS)).
+handle_info(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(handle_info(Msg, BQS)).
+
resume(State = #state{bq = BQ}) ->
foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State);
resume(State = #passthrough{bq = BQ, bqs = BQS}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49f4d8d3ed..aed3c9d7c9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -26,7 +26,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- zip_msgs_and_acks/4, multiple_routing_keys/0]).
+ zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]).
-export([start/2, stop/1]).
@@ -325,7 +325,8 @@
memory_reduction_run_count,
%% Queue data is grouped by VHost. We need to store it
%% to work with queue index.
- virtual_host
+ virtual_host,
+ waiting_bump = false
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -911,6 +912,9 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
+ State#vqstate{ waiting_bump = false }.
+
resume(State) -> a(reduce_memory_use(State)).
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
@@ -2466,21 +2470,16 @@ reduce_memory_use(State = #vqstate {
Blocked = credit_flow:blocked(),
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
%% Credit bump will continue paging
- {true, _} -> ok;
+ {true, _} -> State3;
%% Finished with paging
- {false, false} -> ok;
+ {false, false} -> State3;
%% Planning next batch
{false, true} ->
%% We don't want to use self-credit-flow, because it's harder to
%% reason about. So the process sends a (prioritised) message to
%% itself and sets a waiting_bump value to keep the message box clean
- case get(waiting_bump) of
- true -> ok;
- _ -> self() ! bump_reduce_memory_use,
- put(waiting_bump, true)
- end
- end,
- State3;
+ maybe_bump_reduce_memory_use(State3)
+ end;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.
reduce_memory_use(State = #vqstate {
@@ -2506,6 +2505,12 @@ reduce_memory_use(State = #vqstate {
garbage_collect(),
State3.
+maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
+ State;
+maybe_bump_reduce_memory_use(State) ->
+ self() ! bump_reduce_memory_use,
+ State#vqstate{ waiting_bump = true }.
+
limit_ram_acks(0, State) ->
{0, ui(State)};
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 247477bf40..59de0cb5c7 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -26,8 +26,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- zip_msgs_and_acks/4]).
--export([start/2, stop/1]).
+ start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]).
%%----------------------------------------------------------------------------
%% This test backing queue follows the variable queue implementation, with
@@ -91,7 +90,8 @@
memory_reduction_run_count,
%% Queue data is grouped by VHost. We need to store it
%% to work with queue index.
- virtual_host
+ virtual_host,
+ waiting_bump = false
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -285,6 +285,9 @@ timeout(State) ->
handle_pre_hibernate(State) ->
rabbit_variable_queue:handle_pre_hibernate(State).
+handle_info(Msg, State) ->
+ rabbit_variable_queue:handle_info(Msg, State).
+
resume(State) -> rabbit_variable_queue:resume(State).
msg_rates(State) ->