diff options
| author | Jean-Sébastien Pedron <jean-sebastien@rabbitmq.com> | 2017-07-20 18:37:19 +0100 |
|---|---|---|
| committer | Gerhard Lazu <gerhard@lazu.co.uk> | 2017-07-25 11:30:03 +0100 |
| commit | 7d0e49c6391fce19d651ad4658923d4b3d415aa6 (patch) | |
| tree | 157baa4205955a2e0472455aab5e1b47203a71e5 /src/gm.erl | |
| parent | 5f03dcc56cd77e6474aab10663860d34c2a8e462 (diff) | |
| download | rabbitmq-server-git-7d0e49c6391fce19d651ad4658923d4b3d415aa6.tar.gz | |
Run garbage collection in GM every 250ms
In high throughput scenarios, e.g. `basic.reject` or `basic.nack`,
messages which belong to a mirrored queue and are replicated within a GM
group, are quickly promoted to the old heap. This means that garbage
collection happens only when the Erlang VM is under memory pressure,
which might be too late. When a process is under pressure, garbage
collection slows it down even further, to the point of RabbitMQ nodes
running out of memory and crashing. To avoid this scenario, We want the
GM process to garbage collect binaries regularly, i.e. every 250ms. The
variable queue does the same for a similar reason:
rabbitmq/rabbitmq-server#289
Initially, we wanted to use the number of messages as the trigger for
garbage collection, but we soon discovered that different workloads
(e.g. small vs large messages) would result in unpredictable and
sub-optimal GC schedules.
Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB
vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this
`spawn_opt` setting, the general collection algorithm is disabled,
meaning that all live data is copied at every garbage collection:
http://erlang.org/doc/man/erlang.html#spawn_opt-3
The RabbitMQ deployment used for testing this change:
* AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11
* 3 RabbitMQ nodes running OTP 20.0.1
* 3 durable & auto-delete queues with 3 replicas each
* each queue master was defined on a different RabbitMQ node
* every RabbitMQ node was running 1 queue master & 2 queue slaves
* 1 consumer per queue with QOS 100
* 100 durable messages @ 1KiB each
* `basic.reject` operations
```
| Node | Message throughput | Memory usage |
| ------ | -------------------- | -------------- |
| rmq0 | 12K - 20K msg/s | 400 - 900 MB |
| rmq1 | 12K - 20K msg/s | 500 - 1000 MB |
| rmq2 | 12K - 20K msg/s | 500 - 800 MB |
```
[#148892851]
Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
Diffstat (limited to 'src/gm.erl')
| -rw-r--r-- | src/gm.erl | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/src/gm.erl b/src/gm.erl index 5679fc30e5..cf3e217010 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -395,9 +395,8 @@ -define(GROUP_TABLE, gm_group). -define(MAX_BUFFER_SIZE, 100000000). %% 100MB --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -416,6 +415,7 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, + force_gc_timer, txn_executor, shutting_down }). @@ -508,7 +508,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], + [{spawn_opt, [{fullsweep_after, 0}]}]). leave(Server) -> gen_server2:cast(Server, leave). @@ -551,6 +552,7 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, + force_gc_timer = undefined, txn_executor = TxnFun, shutting_down = false }}. @@ -707,6 +709,10 @@ handle_cast(leave, State) -> {stop, normal, State}. +handle_info(force_gc, State) -> + garbage_collect(), + noreply(State #state { force_gc_timer = undefined }); + handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); @@ -882,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. + {noreply, ensure_timers(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + {reply, Reply, ensure_timers(State), flush_timeout(State)}. + +ensure_timers(State) -> + ensure_force_gc_timer(ensure_broadcast_timer(State)). flush_timeout(#state{broadcast_buffer = []}) -> infinity; flush_timeout(_) -> 0. +ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) + when is_reference(TRef) -> + State; +ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> + TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), + State #state { force_gc_timer = TRef }. + ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> State; @@ -957,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self, end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [], - broadcast_buffer_sz = 0}. - + broadcast_buffer_sz = 0 }. %% --------------------------------------------------------------------------- %% View construction and inspection |
