summaryrefslogtreecommitdiff
path: root/src/gm.erl
diff options
context:
space:
mode:
authorJean-Sébastien Pedron <jean-sebastien@rabbitmq.com>2017-07-20 18:37:19 +0100
committerGerhard Lazu <gerhard@lazu.co.uk>2017-07-25 11:30:03 +0100
commit7d0e49c6391fce19d651ad4658923d4b3d415aa6 (patch)
tree157baa4205955a2e0472455aab5e1b47203a71e5 /src/gm.erl
parent5f03dcc56cd77e6474aab10663860d34c2a8e462 (diff)
downloadrabbitmq-server-git-7d0e49c6391fce19d651ad4658923d4b3d415aa6.tar.gz
Run garbage collection in GM every 250ms
In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
Diffstat (limited to 'src/gm.erl')
-rw-r--r--src/gm.erl29
1 files changed, 22 insertions, 7 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 5679fc30e5..cf3e217010 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -395,9 +395,8 @@
-define(GROUP_TABLE, gm_group).
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(FORCE_GC_TIMER, 250).
-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -416,6 +415,7 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
+ force_gc_timer,
txn_executor,
shutting_down
}).
@@ -508,7 +508,8 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
+ [{spawn_opt, [{fullsweep_after, 0}]}]).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -551,6 +552,7 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
+ force_gc_timer = undefined,
txn_executor = TxnFun,
shutting_down = false }}.
@@ -707,6 +709,10 @@ handle_cast(leave, State) ->
{stop, normal, State}.
+handle_info(force_gc, State) ->
+ garbage_collect(),
+ noreply(State #state { force_gc_timer = undefined });
+
handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -882,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {noreply, ensure_timers(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {reply, Reply, ensure_timers(State), flush_timeout(State)}.
+
+ensure_timers(State) ->
+ ensure_force_gc_timer(ensure_broadcast_timer(State)).
flush_timeout(#state{broadcast_buffer = []}) -> infinity;
flush_timeout(_) -> 0.
+ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
+ when is_reference(TRef) ->
+ State;
+ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
+ TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
+ State #state { force_gc_timer = TRef }.
+
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
@@ -957,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [],
- broadcast_buffer_sz = 0}.
-
+ broadcast_buffer_sz = 0 }.
%% ---------------------------------------------------------------------------
%% View construction and inspection