summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-01-05 15:59:52 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-01-05 15:59:52 +0100
commite1bb001582da62718fc5339490813ab53045a719 (patch)
treebcddace3e64cea0d18d435df1948afad5f95ef21
parent4e174559130bac052a269b52606b525280d6212f (diff)
parent62d8e67571af1f5e90e1e5864c5f7ca101d2c0c2 (diff)
downloadrabbitmq-server-git-e1bb001582da62718fc5339490813ab53045a719.tar.gz
Merge branch 'stable'
-rw-r--r--Makefile3
-rw-r--r--src/rabbit_variable_queue.erl53
-rw-r--r--test/unit_inbroker_SUITE.erl12
-rw-r--r--test/worker_pool_SUITE.erl15
4 files changed, 49 insertions, 34 deletions
diff --git a/Makefile b/Makefile
index f87a09ec7a..5eee081181 100644
--- a/Makefile
+++ b/Makefile
@@ -110,7 +110,8 @@ define PROJECT_ENV
]},
%% rabbitmq-server-973
- {lazy_queue_explicit_gc_run_operation_threshold, 250},
+ {queue_explicit_gc_run_operation_threshold, 1000},
+ {lazy_queue_explicit_gc_run_operation_threshold, 1000},
{background_gc_enabled, true},
{background_gc_target_interval, 60000}
]
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9dbd4fdbf2..5581143e69 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -447,21 +447,28 @@
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
-
%% we define the garbage collector threshold
-%% it needs to tune the GC calls inside `reduce_memory_use`
-%% see: rabbitmq-server-973 and `maybe_execute_gc` function
--define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250).
--define(EXPLICIT_GC_RUN_OP_THRESHOLD,
+%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
+%% see: rabbitmq-server-973 and rabbitmq-server-964
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
case get(explicit_gc_run_operation_threshold) of
undefined ->
- Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold,
- ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD),
+ Val = explicit_gc_run_operation_threshold_for_mode(Mode),
put(explicit_gc_run_operation_threshold, Val),
Val;
Val -> Val
end).
+explicit_gc_run_operation_threshold_for_mode(Mode) ->
+ {Key, Fallback} = case Mode of
+ lazy -> {lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD};
+ _ -> {queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}
+ end,
+ rabbit_misc:get_env(rabbit, Key, Fallback).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -637,27 +644,27 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
fun maybe_write_to_disk/4,
State),
- a(reduce_memory_use(maybe_update_rates(State1))).
+ a(maybe_reduce_memory_use(maybe_update_rates(State1))).
batch_publish(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, State1} =
lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
State2 = ui(State1),
- a(reduce_memory_use(maybe_update_rates(State2))).
+ a(maybe_reduce_memory_use(maybe_update_rates(State2))).
publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
{SeqId, State1} =
publish_delivered1(Msg, MsgProps, ChPid, Flow,
fun maybe_write_to_disk/4,
State),
- {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+ {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
batch_publish_delivered(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, SeqIds, State1} =
lists:foldl(fun batch_publish_delivered1/2,
{ChPid, Flow, [], State}, Publishes),
State2 = ui(State1),
- {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
+ {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -761,7 +768,7 @@ requeue(AckTags, #vqstate { mode = default,
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
State2),
MsgCount = length(MsgIds2),
- {MsgIds2, a(reduce_memory_use(
+ {MsgIds2, a(maybe_reduce_memory_use(
maybe_update_rates(ui(
State3 #vqstate { delta = Delta1,
q3 = Q3a,
@@ -779,7 +786,7 @@ requeue(AckTags, #vqstate { mode = lazy,
{Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
State1),
MsgCount = length(MsgIds1),
- {MsgIds1, a(reduce_memory_use(
+ {MsgIds1, a(maybe_reduce_memory_use(
maybe_update_rates(ui(
State2 #vqstate { delta = Delta1,
q3 = Q3a,
@@ -829,7 +836,7 @@ set_ram_duration_target(
(TargetRamCount =/= infinity andalso
TargetRamCount1 >= TargetRamCount) of
true -> State1;
- false -> reduce_memory_use(State1)
+ false -> maybe_reduce_memory_use(State1)
end).
maybe_update_rates(State = #vqstate{ in_counter = InCount,
@@ -911,7 +918,7 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-resume(State) -> a(reduce_memory_use(State)).
+resume(State) -> a(maybe_reduce_memory_use(State)).
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
@@ -2364,12 +2371,12 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
-maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) ->
- case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of
- true -> garbage_collect(),
- State#vqstate{memory_reduction_run_count = 0};
- false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
-
+maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
+ mode = Mode}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
+ true -> State1 = reduce_memory_use(State),
+ State1#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
end.
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
@@ -2384,7 +2391,6 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
-
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> State;
@@ -2444,7 +2450,8 @@ reduce_memory_use(State = #vqstate {
S2 ->
push_betas_to_deltas(S2, State1)
end,
- maybe_execute_gc(State3).
+ garbage_collect(),
+ State3.
limit_ram_acks(0, State) ->
{0, ui(State)};
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index af86371fc2..91a3eb32a6 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -237,9 +237,21 @@ orelse Group =:= backing_queue_embed_limit_1024 ->
end_per_group1(_, Config) ->
Config.
+init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue;
+ Testcase == variable_queue_fold ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, application, set_env,
+ [rabbit, queue_explicit_gc_run_operation_threshold, 0]),
+ rabbit_ct_helpers:testcase_started(Config, Testcase);
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
+end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue;
+ Testcase == variable_queue_fold ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, application, set_env,
+ [rabbit, queue_explicit_gc_run_operation_threshold, 1000]),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl
index 7eb4d6fd04..9b6b0721a2 100644
--- a/test/worker_pool_SUITE.erl
+++ b/test/worker_pool_SUITE.erl
@@ -41,7 +41,7 @@ end_per_testcase(_, Config) ->
unlink(Pool),
exit(Pool, kill).
-run_code_synchronously(Config) ->
+run_code_synchronously(_) ->
Self = self(),
Test = make_ref(),
Sleep = 200,
@@ -63,7 +63,7 @@ run_code_synchronously(Config) ->
% Worker is a separate process
true = (Self /= Result).
-run_code_asynchronously(Config) ->
+run_code_asynchronously(_) ->
Self = self(),
Test = make_ref(),
Sleep = 200,
@@ -84,7 +84,7 @@ run_code_asynchronously(Config) ->
% Worker is a separate process
true = (Self /= Result).
-set_timeout(Config) ->
+set_timeout(_) ->
Self = self(),
Test = make_ref(),
Worker = worker_pool:submit(?POOL_NAME,
@@ -112,7 +112,7 @@ set_timeout(Config) ->
end.
-cancel_timeout(Config) ->
+cancel_timeout(_) ->
Self = self(),
Test = make_ref(),
Worker = worker_pool:submit(?POOL_NAME,
@@ -146,7 +146,7 @@ cancel_timeout(Config) ->
after 0 -> ok
end.
-cancel_timeout_by_setting(Config) ->
+cancel_timeout_by_setting(_) ->
Self = self(),
Test = make_ref(),
Worker = worker_pool:submit(?POOL_NAME,
@@ -186,8 +186,3 @@ cancel_timeout_by_setting(Config) ->
receive {hello_reset, Worker, Test} -> ok
after 1000 -> exit(timeout_is_late)
end.
-
-
-
-
-