summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-12-24 04:09:12 +0300
committerGitHub <noreply@github.com>2016-12-24 04:09:12 +0300
commitfdf783f9accd0b7c914b17907c0d7528750fb961 (patch)
tree816a802958e13515f158b8d52d0f6dfd69dc839d /test
parent72f7c8e70028209ee1c58074c087b4d0cb901e6c (diff)
parent1d4e939b84b56dd793bd265bec074a392cdf194b (diff)
downloadrabbitmq-server-git-fdf783f9accd0b7c914b17907c0d7528750fb961.tar.gz
Merge pull request #766 from rabbitmq/rabbitmq-server-567
Per vhost message store
Diffstat (limited to 'test')
-rw-r--r--test/channel_operation_timeout_test_queue.erl75
-rw-r--r--test/per_vhost_msg_store_SUITE.erl254
-rw-r--r--test/unit_inbroker_SUITE.erl22
3 files changed, 317 insertions, 34 deletions
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 4407a24e7f..124fda47b1 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -111,8 +111,8 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
+-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
-define(QUEUE, lqueue).
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
@@ -215,14 +215,27 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
- start_msg_store(
- [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- StartFunState),
+ %% Group recovery terms by vhost.
+ {[], VhostRefs} = lists:foldl(
+ fun
+ %% We need to skip a queue name
+ (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
+ {QNames, VhostRefs};
+ (Terms, {[QueueName | QNames], VhostRefs}) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {QNames, VhostRefs};
+ Ref ->
+ #resource{virtual_host = VHost} = QueueName,
+ Refs = case maps:find(VHost, VhostRefs) of
+ {ok, Val} -> Val;
+ error -> []
+ end,
+ {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
+ end
+ end,
+ {DurableQueues, #{}},
+ AllTerms),
+ start_msg_store(VhostRefs, StartFunState),
{ok, AllTerms}.
stop() ->
@@ -230,12 +243,21 @@ stop() ->
ok = rabbit_queue_index:stop().
start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- Refs, StartFunState]).
+ Refs, StartFunState]),
+ %% Start message store for all known vhosts
+ VHosts = rabbit_vhost:list(),
+ lists:foreach(
+ fun(VHost) ->
+ rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
+ rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
+ end,
+ VHosts),
+ ok.
stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
@@ -254,22 +276,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback);
+ MsgOnDiskFun, AsyncCallback,
+ VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback),
+ MsgOnDiskFun, AsyncCallback,
+ VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
@@ -278,11 +304,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback),
+ undefined, AsyncCallback,
+ VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -957,14 +984,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback).
+ Callback, VHost).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end).
+ rabbit_msg_store_vhost_sup:client_init(
+ MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end,
+ VHost).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
diff --git a/test/per_vhost_msg_store_SUITE.erl b/test/per_vhost_msg_store_SUITE.erl
new file mode 100644
index 0000000000..4d88c84b7e
--- /dev/null
+++ b/test/per_vhost_msg_store_SUITE.erl
@@ -0,0 +1,254 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(per_vhost_msg_store_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(MSGS_COUNT, 100).
+
+all() ->
+ [
+ publish_to_different_dirs,
+ storage_deleted_on_vhost_delete,
+ single_vhost_storage_delete_is_safe
+ ].
+
+
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ Config2 = rabbit_ct_helpers:merge_app_env(
+ Config1,
+ {rabbit, [{queue_index_embed_msgs_below, 100}]}),
+ rabbit_ct_helpers:run_setup_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(_, Config) ->
+ Vhost1 = <<"vhost1">>,
+ Vhost2 = <<"vhost2">>,
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost1),
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost2),
+ Chan1 = open_channel(Vhost1, Config),
+ Chan2 = open_channel(Vhost2, Config),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{vhost1, Vhost1}, {vhost2, Vhost2},
+ {channel1, Chan1}, {channel2, Chan2}]).
+
+end_per_testcase(single_vhost_storage_delete_is_safe, Config) ->
+ Config;
+end_per_testcase(_, Config) ->
+ Vhost1 = ?config(vhost1, Config),
+ Vhost2 = ?config(vhost2, Config),
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost2),
+ Config.
+
+publish_to_different_dirs(Config) ->
+ Vhost1 = ?config(vhost1, Config),
+ Vhost2 = ?config(vhost2, Config),
+ Channel1 = ?config(channel1, Config),
+ Channel2 = ?config(channel2, Config),
+ Queue1 = declare_durable_queue(Channel1),
+ Queue2 = declare_durable_queue(Channel2),
+ FolderSize1 = get_folder_size(Vhost1, Config),
+ FolderSize2 = get_folder_size(Vhost2, Config),
+
+ %% Publish message to a queue index
+ publish_persistent_messages(index, Channel1, Queue1),
+ %% First storage increased
+ FolderSize11 = get_folder_size(Vhost1, Config),
+ true = (FolderSize1 < FolderSize11),
+ %% Second storage didn't increased
+ FolderSize2 = get_folder_size(Vhost2, Config),
+
+ %% Publish message to a message store
+ publish_persistent_messages(store, Channel1, Queue1),
+ %% First storage increased
+ FolderSize12 = get_folder_size(Vhost1, Config),
+ true = (FolderSize11 < FolderSize12),
+ %% Second storage didn't increased
+ FolderSize2 = get_folder_size(Vhost2, Config),
+
+ %% Publish message to a queue index
+ publish_persistent_messages(index, Channel2, Queue2),
+ %% First storage increased
+ FolderSize21 = get_folder_size(Vhost2, Config),
+ true = (FolderSize2 < FolderSize21),
+ %% Second storage didn't increased
+ FolderSize12 = get_folder_size(Vhost1, Config),
+
+ %% Publish message to a message store
+ publish_persistent_messages(store, Channel2, Queue2),
+ %% Second storage increased
+ FolderSize22 = get_folder_size(Vhost2, Config),
+ true = (FolderSize21 < FolderSize22),
+ %% First storage didn't increased
+ FolderSize12 = get_folder_size(Vhost1, Config).
+
+storage_deleted_on_vhost_delete(Config) ->
+ Vhost1 = ?config(vhost1, Config),
+ Channel1 = ?config(channel1, Config),
+ Queue1 = declare_durable_queue(Channel1),
+ FolderSize = get_global_folder_size(Config),
+
+ publish_persistent_messages(index, Channel1, Queue1),
+ publish_persistent_messages(store, Channel1, Queue1),
+ FolderSizeAfterPublish = get_global_folder_size(Config),
+
+ %% Total storage size increased
+ true = (FolderSize < FolderSizeAfterPublish),
+
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1),
+
+ %% Total memory reduced
+ FolderSizeAfterDelete = get_global_folder_size(Config),
+ true = (FolderSizeAfterPublish > FolderSizeAfterDelete),
+
+ %% There is no Vhost1 folder
+ 0 = get_folder_size(Vhost1, Config).
+
+
+single_vhost_storage_delete_is_safe(Config) ->
+ct:pal("Start test 3", []),
+ Vhost1 = ?config(vhost1, Config),
+ Vhost2 = ?config(vhost2, Config),
+ Channel1 = ?config(channel1, Config),
+ Channel2 = ?config(channel2, Config),
+ Queue1 = declare_durable_queue(Channel1),
+ Queue2 = declare_durable_queue(Channel2),
+
+ %% Publish messages to both stores
+ publish_persistent_messages(index, Channel1, Queue1),
+ publish_persistent_messages(store, Channel1, Queue1),
+ publish_persistent_messages(index, Channel2, Queue2),
+ publish_persistent_messages(store, Channel2, Queue2),
+
+ queue_is_not_empty(Channel2, Queue2),
+ % Vhost2Dir = vhost_dir(Vhost2, Config),
+ % [StoreFile] = filelib:wildcard(binary_to_list(filename:join([Vhost2Dir, "msg_store_persistent_*", "0.rdq"]))),
+ % ct:pal("Store file ~p~n", [file:read_file(StoreFile)]).
+% ok.
+ rabbit_ct_broker_helpers:stop_broker(Config, 0),
+ delete_vhost_data(Vhost1, Config),
+ rabbit_ct_broker_helpers:start_broker(Config, 0),
+
+ Channel11 = open_channel(Vhost1, Config),
+ Channel21 = open_channel(Vhost2, Config),
+
+ %% There are no Vhost1 messages
+ queue_is_empty(Channel11, Queue1),
+
+ %% But Vhost2 messages are in place
+ queue_is_not_empty(Channel21, Queue2),
+ consume_messages(index, Channel21, Queue2),
+ consume_messages(store, Channel21, Queue2).
+
+declare_durable_queue(Channel) ->
+ QName = list_to_binary(erlang:ref_to_list(make_ref())),
+ #'queue.declare_ok'{queue = QName} =
+ amqp_channel:call(Channel,
+ #'queue.declare'{queue = QName, durable = true}),
+ QName.
+
+publish_persistent_messages(Storage, Channel, Queue) ->
+ MessagePayload = case Storage of
+ index -> binary:copy(<<"=">>, 50);
+ store -> binary:copy(<<"-">>, 150)
+ end,
+ amqp_channel:call(Channel, #'confirm.select'{}),
+ [amqp_channel:call(Channel,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = MessagePayload})
+ || _ <- lists:seq(1, ?MSGS_COUNT)],
+ amqp_channel:wait_for_confirms(Channel).
+
+
+get_folder_size(Vhost, Config) ->
+ Dir = vhost_dir(Vhost, Config),
+ folder_size(Dir).
+
+folder_size(Dir) ->
+ filelib:fold_files(Dir, ".*", true,
+ fun(F,Acc) -> filelib:file_size(F) + Acc end, 0).
+
+get_global_folder_size(Config) ->
+ BaseDir = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, dir, []),
+ folder_size(BaseDir).
+
+vhost_dir(Vhost, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_vhost, msg_store_dir_path, [Vhost]).
+
+delete_vhost_data(Vhost, Config) ->
+ Dir = vhost_dir(Vhost, Config),
+ rabbit_file:recursive_delete([Dir]).
+
+queue_is_empty(Channel, Queue) ->
+ #'queue.declare_ok'{queue = Queue, message_count = 0} =
+ amqp_channel:call(Channel,
+ #'queue.declare'{ queue = Queue,
+ durable = true,
+ passive = true}).
+
+queue_is_not_empty(Channel, Queue) ->
+ #'queue.declare_ok'{queue = Queue, message_count = MsgCount} =
+ amqp_channel:call(Channel,
+ #'queue.declare'{ queue = Queue,
+ durable = true,
+ passive = true}),
+ ExpectedCount = ?MSGS_COUNT * 2,
+ ExpectedCount = MsgCount.
+
+consume_messages(Storage, Channel, Queue) ->
+ MessagePayload = case Storage of
+ index -> binary:copy(<<"=">>, 50);
+ store -> binary:copy(<<"-">>, 150)
+ end,
+ lists:foreach(
+ fun(I) ->
+ ct:pal("Consume message ~p~n ~p~n", [I, MessagePayload]),
+ {#'basic.get_ok'{}, Content} =
+ amqp_channel:call(Channel,
+ #'basic.get'{queue = Queue,
+ no_ack = true}),
+ #amqp_msg{payload = MessagePayload} = Content
+ end,
+ lists:seq(1, ?MSGS_COUNT)),
+ ok.
+
+open_channel(Vhost, Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, Conn} = amqp_connection:start(
+ #amqp_params_direct{node = Node, virtual_host = Vhost}),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Chan.
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index a7f2ef4603..98fb6ac3c2 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -22,8 +22,8 @@
-compile(export_all).
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
+-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
@@ -347,7 +347,7 @@ msg_store1(_Config) ->
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
- [], {fun ([]) -> finished;
+ #{}, {fun ([]) -> finished;
([MsgId|MsgIdsTail])
when length(MsgIdsTail) rem 2 == 0 ->
{MsgId, 1, MsgIdsTail};
@@ -468,10 +468,10 @@ on_disk_stop(Pid) ->
msg_store_client_init_capture(MsgStore, Ref) ->
Pid = spawn(fun on_disk_capture/0),
- {Pid, rabbit_msg_store:client_init(
+ {Pid, rabbit_msg_store_vhost_sup:client_init(
MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
Pid ! {on_disk, MsgIds}
- end, undefined)}.
+ end, undefined, <<"/">>)}.
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
@@ -548,14 +548,14 @@ test_msg_store_confirm_timer() ->
Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
- MSCState = rabbit_msg_store:client_init(
+ MSCState = rabbit_msg_store_vhost_sup:client_init(
?PERSISTENT_MSG_STORE, Ref,
fun (MsgIds, _ActionTaken) ->
case gb_sets:is_member(MsgId, MsgIds) of
true -> Self ! on_disk;
false -> ok
end
- end, undefined),
+ end, undefined, <<"/">>),
ok = msg_store_write([MsgId], MSCState),
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
ok = msg_store_remove([MsgId], MSCState),
@@ -1424,7 +1424,7 @@ nop(_) -> ok.
nop(_, _) -> ok.
msg_store_client_init(MsgStore, Ref) ->
- rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
+ rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
@@ -1842,7 +1842,7 @@ log_management(Config) ->
?MODULE, log_management1, [Config]).
log_management1(_Config) ->
- [LogFile] = rabbit:log_locations(),
+ [LogFile|_] = rabbit:log_locations(),
Suffix = ".0",
ok = test_logs_working([LogFile]),
@@ -1917,7 +1917,7 @@ log_management_during_startup(Config) ->
?MODULE, log_management_during_startup1, [Config]).
log_management_during_startup1(_Config) ->
- [LogFile] = rabbit:log_locations(),
+ [LogFile|_] = rabbit:log_locations(),
Suffix = ".0",
%% start application with simple tty logging
@@ -2002,7 +2002,7 @@ externally_rotated_logs_are_automatically_reopened(Config) ->
?MODULE, externally_rotated_logs_are_automatically_reopened1, [Config]).
externally_rotated_logs_are_automatically_reopened1(_Config) ->
- [LogFile] = rabbit:log_locations(),
+ [LogFile|_] = rabbit:log_locations(),
%% Make sure log file is opened
ok = test_logs_working([LogFile]),