diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2017-01-05 15:59:52 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2017-01-05 15:59:52 +0100 |
| commit | e1bb001582da62718fc5339490813ab53045a719 (patch) | |
| tree | bcddace3e64cea0d18d435df1948afad5f95ef21 | |
| parent | 4e174559130bac052a269b52606b525280d6212f (diff) | |
| parent | 62d8e67571af1f5e90e1e5864c5f7ca101d2c0c2 (diff) | |
| download | rabbitmq-server-git-e1bb001582da62718fc5339490813ab53045a719.tar.gz | |
Merge branch 'stable'
| -rw-r--r-- | Makefile | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 53 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 12 | ||||
| -rw-r--r-- | test/worker_pool_SUITE.erl | 15 |
4 files changed, 49 insertions, 34 deletions
@@ -110,7 +110,8 @@ define PROJECT_ENV ]}, %% rabbitmq-server-973 - {lazy_queue_explicit_gc_run_operation_threshold, 250}, + {queue_explicit_gc_run_operation_threshold, 1000}, + {lazy_queue_explicit_gc_run_operation_threshold, 1000}, {background_gc_enabled, true}, {background_gc_target_interval, 60000} ] diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9dbd4fdbf2..5581143e69 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -447,21 +447,28 @@ %% rabbit_amqqueue_process need fairly fresh rates. -define(MSGS_PER_RATE_CALC, 100). - %% we define the garbage collector threshold -%% it needs to tune the GC calls inside `reduce_memory_use` -%% see: rabbitmq-server-973 and `maybe_execute_gc` function --define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250). --define(EXPLICIT_GC_RUN_OP_THRESHOLD, +%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection. +%% see: rabbitmq-server-973 and rabbitmq-server-964 +-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000). +-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode), case get(explicit_gc_run_operation_threshold) of undefined -> - Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold, - ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD), + Val = explicit_gc_run_operation_threshold_for_mode(Mode), put(explicit_gc_run_operation_threshold, Val), Val; Val -> Val end). +explicit_gc_run_operation_threshold_for_mode(Mode) -> + {Key, Fallback} = case Mode of + lazy -> {lazy_queue_explicit_gc_run_operation_threshold, + ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}; + _ -> {queue_explicit_gc_run_operation_threshold, + ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD} + end, + rabbit_misc:get_env(rabbit, Key, Fallback). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -637,27 +644,27 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, fun maybe_write_to_disk/4, State), - a(reduce_memory_use(maybe_update_rates(State1))). + a(maybe_reduce_memory_use(maybe_update_rates(State1))). batch_publish(Publishes, ChPid, Flow, State) -> {ChPid, Flow, State1} = lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), State2 = ui(State1), - a(reduce_memory_use(maybe_update_rates(State2))). + a(maybe_reduce_memory_use(maybe_update_rates(State2))). publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> {SeqId, State1} = publish_delivered1(Msg, MsgProps, ChPid, Flow, fun maybe_write_to_disk/4, State), - {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. + {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}. batch_publish_delivered(Publishes, ChPid, Flow, State) -> {ChPid, Flow, SeqIds, State1} = lists:foldl(fun batch_publish_delivered1/2, {ChPid, Flow, [], State}, Publishes), State2 = ui(State1), - {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. + {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}. discard(_MsgId, _ChPid, _Flow, State) -> State. @@ -761,7 +768,7 @@ requeue(AckTags, #vqstate { mode = default, {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, State2), MsgCount = length(MsgIds2), - {MsgIds2, a(reduce_memory_use( + {MsgIds2, a(maybe_reduce_memory_use( maybe_update_rates(ui( State3 #vqstate { delta = Delta1, q3 = Q3a, @@ -779,7 +786,7 @@ requeue(AckTags, #vqstate { mode = lazy, {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, State1), MsgCount = length(MsgIds1), - {MsgIds1, a(reduce_memory_use( + {MsgIds1, a(maybe_reduce_memory_use( maybe_update_rates(ui( State2 #vqstate { delta = Delta1, q3 = Q3a, @@ -829,7 +836,7 @@ set_ram_duration_target( (TargetRamCount =/= infinity andalso TargetRamCount1 >= TargetRamCount) of true -> State1; - false -> reduce_memory_use(State1) + false -> maybe_reduce_memory_use(State1) end). maybe_update_rates(State = #vqstate{ in_counter = InCount, @@ -911,7 +918,7 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -resume(State) -> a(reduce_memory_use(State)). +resume(State) -> a(maybe_reduce_memory_use(State)). msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> @@ -2364,12 +2371,12 @@ ifold(Fun, Acc, Its, State) -> %% Phase changes %%---------------------------------------------------------------------------- -maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) -> - case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of - true -> garbage_collect(), - State#vqstate{memory_reduction_run_count = 0}; - false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} - +maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount, + mode = Mode}) -> + case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of + true -> State1 = reduce_memory_use(State), + State1#vqstate{memory_reduction_run_count = 0}; + false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} end. reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> @@ -2384,7 +2391,6 @@ reduce_memory_use(State = #vqstate { out = AvgEgress, ack_in = AvgAckIngress, ack_out = AvgAckEgress } }) -> - State1 = #vqstate { q2 = Q2, q3 = Q3 } = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of 0 -> State; @@ -2444,7 +2450,8 @@ reduce_memory_use(State = #vqstate { S2 -> push_betas_to_deltas(S2, State1) end, - maybe_execute_gc(State3). + garbage_collect(), + State3. limit_ram_acks(0, State) -> {0, ui(State)}; diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index af86371fc2..91a3eb32a6 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -237,9 +237,21 @@ orelse Group =:= backing_queue_embed_limit_1024 -> end_per_group1(_, Config) -> Config. +init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; + Testcase == variable_queue_fold -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbit, queue_explicit_gc_run_operation_threshold, 0]), + rabbit_ct_helpers:testcase_started(Config, Testcase); init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). +end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; + Testcase == variable_queue_fold -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbit, queue_explicit_gc_run_operation_threshold, 1000]), + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl index 7eb4d6fd04..9b6b0721a2 100644 --- a/test/worker_pool_SUITE.erl +++ b/test/worker_pool_SUITE.erl @@ -41,7 +41,7 @@ end_per_testcase(_, Config) -> unlink(Pool), exit(Pool, kill). -run_code_synchronously(Config) -> +run_code_synchronously(_) -> Self = self(), Test = make_ref(), Sleep = 200, @@ -63,7 +63,7 @@ run_code_synchronously(Config) -> % Worker is a separate process true = (Self /= Result). -run_code_asynchronously(Config) -> +run_code_asynchronously(_) -> Self = self(), Test = make_ref(), Sleep = 200, @@ -84,7 +84,7 @@ run_code_asynchronously(Config) -> % Worker is a separate process true = (Self /= Result). -set_timeout(Config) -> +set_timeout(_) -> Self = self(), Test = make_ref(), Worker = worker_pool:submit(?POOL_NAME, @@ -112,7 +112,7 @@ set_timeout(Config) -> end. -cancel_timeout(Config) -> +cancel_timeout(_) -> Self = self(), Test = make_ref(), Worker = worker_pool:submit(?POOL_NAME, @@ -146,7 +146,7 @@ cancel_timeout(Config) -> after 0 -> ok end. -cancel_timeout_by_setting(Config) -> +cancel_timeout_by_setting(_) -> Self = self(), Test = make_ref(), Worker = worker_pool:submit(?POOL_NAME, @@ -186,8 +186,3 @@ cancel_timeout_by_setting(Config) -> receive {hello_reset, Worker, Test} -> ok after 1000 -> exit(timeout_is_late) end. - - - - - |
