diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 111f263130..367cbb464e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -30,6 +30,9 @@ -export([start/1, stop/0]). +%% exported for parallel map +-export([add_vhost_msg_store/1]). + %% exported for testing only -export([start_msg_store/2, stop_msg_store/0, init/6]). @@ -38,6 +41,7 @@ -include_lib("stdlib/include/qlc.hrl"). -define(QUEUE_MIGRATION_BATCH_SIZE, 100). +-define(MSG_STORE_RESTORE_BATCH_SIZE, 100). %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on @@ -500,14 +504,17 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]), %% Start message store for all known vhosts VHosts = rabbit_vhost:list(), - lists:foreach( - fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost) - end, - VHosts), + RestoreBatchSize = application:get_env(rabbit, msg_store_restore_batch_size, + ?MSG_STORE_RESTORE_BATCH_SIZE), + in_batches(RestoreBatchSize, + {rabbit_variable_queue, add_vhost_msg_store, []}, + VHosts), ok. +add_vhost_msg_store(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost). + stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP). |
