diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-12-22 11:55:33 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-12-22 11:55:33 +0000 |
| commit | 5282534220029eaaf138542eb15b16fa96afde82 (patch) | |
| tree | 3c41564f2f54e0a2c029de89658f1938d572f84e /src | |
| parent | 1403596717839c48f95903dc37058c2146878f9f (diff) | |
| download | rabbitmq-server-git-5282534220029eaaf138542eb15b16fa96afde82.tar.gz | |
Recover vhosts message stores in parallel after a crash
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 111f263130..367cbb464e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -30,6 +30,9 @@ -export([start/1, stop/0]). +%% exported for parallel map +-export([add_vhost_msg_store/1]). + %% exported for testing only -export([start_msg_store/2, stop_msg_store/0, init/6]). @@ -38,6 +41,7 @@ -include_lib("stdlib/include/qlc.hrl"). -define(QUEUE_MIGRATION_BATCH_SIZE, 100). +-define(MSG_STORE_RESTORE_BATCH_SIZE, 100). %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on @@ -500,14 +504,17 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]), %% Start message store for all known vhosts VHosts = rabbit_vhost:list(), - lists:foreach( - fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost) - end, - VHosts), + RestoreBatchSize = application:get_env(rabbit, msg_store_restore_batch_size, + ?MSG_STORE_RESTORE_BATCH_SIZE), + in_batches(RestoreBatchSize, + {rabbit_variable_queue, add_vhost_msg_store, []}, + VHosts), ok. +add_vhost_msg_store(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost). + stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP). |
