summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl19
1 files changed, 13 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 111f263130..367cbb464e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -30,6 +30,9 @@
-export([start/1, stop/0]).
+%% exported for parallel map
+-export([add_vhost_msg_store/1]).
+
%% exported for testing only
-export([start_msg_store/2, stop_msg_store/0, init/6]).
@@ -38,6 +41,7 @@
-include_lib("stdlib/include/qlc.hrl").
-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
+-define(MSG_STORE_RESTORE_BATCH_SIZE, 100).
%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
@@ -500,14 +504,17 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
[?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]),
%% Start message store for all known vhosts
VHosts = rabbit_vhost:list(),
- lists:foreach(
- fun(VHost) ->
- rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
- rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost)
- end,
- VHosts),
+ RestoreBatchSize = application:get_env(rabbit, msg_store_restore_batch_size,
+ ?MSG_STORE_RESTORE_BATCH_SIZE),
+ in_batches(RestoreBatchSize,
+ {rabbit_variable_queue, add_vhost_msg_store, []},
+ VHosts),
ok.
+add_vhost_msg_store(VHost) ->
+ rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
+ rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost).
+
stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).