diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-12-24 04:09:12 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-12-24 04:09:12 +0300 |
| commit | fdf783f9accd0b7c914b17907c0d7528750fb961 (patch) | |
| tree | 816a802958e13515f158b8d52d0f6dfd69dc839d /test | |
| parent | 72f7c8e70028209ee1c58074c087b4d0cb901e6c (diff) | |
| parent | 1d4e939b84b56dd793bd265bec074a392cdf194b (diff) | |
| download | rabbitmq-server-git-fdf783f9accd0b7c914b17907c0d7528750fb961.tar.gz | |
Merge pull request #766 from rabbitmq/rabbitmq-server-567
Per vhost message store
Diffstat (limited to 'test')
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 75 | ||||
| -rw-r--r-- | test/per_vhost_msg_store_SUITE.erl | 254 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 22 |
3 files changed, 317 insertions, 34 deletions
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 4407a24e7f..124fda47b1 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -111,8 +111,8 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). -define(QUEUE, lqueue). -define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). @@ -215,14 +215,27 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end, + {DurableQueues, #{}}, + AllTerms), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> @@ -230,12 +243,21 @@ stop() -> ok = rabbit_queue_index:stop(). start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]). + Refs, StartFunState]), + %% Start message store for all known vhosts + VHosts = rabbit_vhost:list(), + lists:foreach( + fun(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) + end, + VHosts), + ok. stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), @@ -254,22 +276,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback); + MsgOnDiskFun, AsyncCallback, + VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), + VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + MsgOnDiskFun, AsyncCallback, + VHost), {C, fun (MsgId) when is_binary(MsgId) -> rabbit_msg_store:contains(MsgId, C); (#basic_message{is_persistent = Persistent}) -> @@ -278,11 +304,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback), + undefined, AsyncCallback, + VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -957,14 +984,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback). + Callback, VHost). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). + rabbit_msg_store_vhost_sup:client_init( + MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end, + VHost). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( diff --git a/test/per_vhost_msg_store_SUITE.erl b/test/per_vhost_msg_store_SUITE.erl new file mode 100644 index 0000000000..4d88c84b7e --- /dev/null +++ b/test/per_vhost_msg_store_SUITE.erl @@ -0,0 +1,254 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(per_vhost_msg_store_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(MSGS_COUNT, 100). + +all() -> + [ + publish_to_different_dirs, + storage_deleted_on_vhost_delete, + single_vhost_storage_delete_is_safe + ]. + + + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodename_suffix, ?MODULE}]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, [{queue_index_embed_msgs_below, 100}]}), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(_, Config) -> + Vhost1 = <<"vhost1">>, + Vhost2 = <<"vhost2">>, + rabbit_ct_broker_helpers:add_vhost(Config, Vhost1), + rabbit_ct_broker_helpers:add_vhost(Config, Vhost2), + Chan1 = open_channel(Vhost1, Config), + Chan2 = open_channel(Vhost2, Config), + rabbit_ct_helpers:set_config( + Config, + [{vhost1, Vhost1}, {vhost2, Vhost2}, + {channel1, Chan1}, {channel2, Chan2}]). + +end_per_testcase(single_vhost_storage_delete_is_safe, Config) -> + Config; +end_per_testcase(_, Config) -> + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1), + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost2), + Config. + +publish_to_different_dirs(Config) -> + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + Channel1 = ?config(channel1, Config), + Channel2 = ?config(channel2, Config), + Queue1 = declare_durable_queue(Channel1), + Queue2 = declare_durable_queue(Channel2), + FolderSize1 = get_folder_size(Vhost1, Config), + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a queue index + publish_persistent_messages(index, Channel1, Queue1), + %% First storage increased + FolderSize11 = get_folder_size(Vhost1, Config), + true = (FolderSize1 < FolderSize11), + %% Second storage didn't increased + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a message store + publish_persistent_messages(store, Channel1, Queue1), + %% First storage increased + FolderSize12 = get_folder_size(Vhost1, Config), + true = (FolderSize11 < FolderSize12), + %% Second storage didn't increased + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a queue index + publish_persistent_messages(index, Channel2, Queue2), + %% First storage increased + FolderSize21 = get_folder_size(Vhost2, Config), + true = (FolderSize2 < FolderSize21), + %% Second storage didn't increased + FolderSize12 = get_folder_size(Vhost1, Config), + + %% Publish message to a message store + publish_persistent_messages(store, Channel2, Queue2), + %% Second storage increased + FolderSize22 = get_folder_size(Vhost2, Config), + true = (FolderSize21 < FolderSize22), + %% First storage didn't increased + FolderSize12 = get_folder_size(Vhost1, Config). + +storage_deleted_on_vhost_delete(Config) -> + Vhost1 = ?config(vhost1, Config), + Channel1 = ?config(channel1, Config), + Queue1 = declare_durable_queue(Channel1), + FolderSize = get_global_folder_size(Config), + + publish_persistent_messages(index, Channel1, Queue1), + publish_persistent_messages(store, Channel1, Queue1), + FolderSizeAfterPublish = get_global_folder_size(Config), + + %% Total storage size increased + true = (FolderSize < FolderSizeAfterPublish), + + ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1), + + %% Total memory reduced + FolderSizeAfterDelete = get_global_folder_size(Config), + true = (FolderSizeAfterPublish > FolderSizeAfterDelete), + + %% There is no Vhost1 folder + 0 = get_folder_size(Vhost1, Config). + + +single_vhost_storage_delete_is_safe(Config) -> +ct:pal("Start test 3", []), + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + Channel1 = ?config(channel1, Config), + Channel2 = ?config(channel2, Config), + Queue1 = declare_durable_queue(Channel1), + Queue2 = declare_durable_queue(Channel2), + + %% Publish messages to both stores + publish_persistent_messages(index, Channel1, Queue1), + publish_persistent_messages(store, Channel1, Queue1), + publish_persistent_messages(index, Channel2, Queue2), + publish_persistent_messages(store, Channel2, Queue2), + + queue_is_not_empty(Channel2, Queue2), + % Vhost2Dir = vhost_dir(Vhost2, Config), + % [StoreFile] = filelib:wildcard(binary_to_list(filename:join([Vhost2Dir, "msg_store_persistent_*", "0.rdq"]))), + % ct:pal("Store file ~p~n", [file:read_file(StoreFile)]). +% ok. + rabbit_ct_broker_helpers:stop_broker(Config, 0), + delete_vhost_data(Vhost1, Config), + rabbit_ct_broker_helpers:start_broker(Config, 0), + + Channel11 = open_channel(Vhost1, Config), + Channel21 = open_channel(Vhost2, Config), + + %% There are no Vhost1 messages + queue_is_empty(Channel11, Queue1), + + %% But Vhost2 messages are in place + queue_is_not_empty(Channel21, Queue2), + consume_messages(index, Channel21, Queue2), + consume_messages(store, Channel21, Queue2). + +declare_durable_queue(Channel) -> + QName = list_to_binary(erlang:ref_to_list(make_ref())), + #'queue.declare_ok'{queue = QName} = + amqp_channel:call(Channel, + #'queue.declare'{queue = QName, durable = true}), + QName. + +publish_persistent_messages(Storage, Channel, Queue) -> + MessagePayload = case Storage of + index -> binary:copy(<<"=">>, 50); + store -> binary:copy(<<"-">>, 150) + end, + amqp_channel:call(Channel, #'confirm.select'{}), + [amqp_channel:call(Channel, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = MessagePayload}) + || _ <- lists:seq(1, ?MSGS_COUNT)], + amqp_channel:wait_for_confirms(Channel). + + +get_folder_size(Vhost, Config) -> + Dir = vhost_dir(Vhost, Config), + folder_size(Dir). + +folder_size(Dir) -> + filelib:fold_files(Dir, ".*", true, + fun(F,Acc) -> filelib:file_size(F) + Acc end, 0). + +get_global_folder_size(Config) -> + BaseDir = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, dir, []), + folder_size(BaseDir). + +vhost_dir(Vhost, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost, msg_store_dir_path, [Vhost]). + +delete_vhost_data(Vhost, Config) -> + Dir = vhost_dir(Vhost, Config), + rabbit_file:recursive_delete([Dir]). + +queue_is_empty(Channel, Queue) -> + #'queue.declare_ok'{queue = Queue, message_count = 0} = + amqp_channel:call(Channel, + #'queue.declare'{ queue = Queue, + durable = true, + passive = true}). + +queue_is_not_empty(Channel, Queue) -> + #'queue.declare_ok'{queue = Queue, message_count = MsgCount} = + amqp_channel:call(Channel, + #'queue.declare'{ queue = Queue, + durable = true, + passive = true}), + ExpectedCount = ?MSGS_COUNT * 2, + ExpectedCount = MsgCount. + +consume_messages(Storage, Channel, Queue) -> + MessagePayload = case Storage of + index -> binary:copy(<<"=">>, 50); + store -> binary:copy(<<"-">>, 150) + end, + lists:foreach( + fun(I) -> + ct:pal("Consume message ~p~n ~p~n", [I, MessagePayload]), + {#'basic.get_ok'{}, Content} = + amqp_channel:call(Channel, + #'basic.get'{queue = Queue, + no_ack = true}), + #amqp_msg{payload = MessagePayload} = Content + end, + lists:seq(1, ?MSGS_COUNT)), + ok. + +open_channel(Vhost, Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, Conn} = amqp_connection:start( + #amqp_params_direct{node = Node, virtual_host = Vhost}), + {ok, Chan} = amqp_connection:open_channel(Conn), + Chan. diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index a7f2ef4603..98fb6ac3c2 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -22,8 +22,8 @@ -compile(export_all). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). @@ -347,7 +347,7 @@ msg_store1(_Config) -> %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( - [], {fun ([]) -> finished; + #{}, {fun ([]) -> finished; ([MsgId|MsgIdsTail]) when length(MsgIdsTail) rem 2 == 0 -> {MsgId, 1, MsgIdsTail}; @@ -468,10 +468,10 @@ on_disk_stop(Pid) -> msg_store_client_init_capture(MsgStore, Ref) -> Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store:client_init( + {Pid, rabbit_msg_store_vhost_sup:client_init( MsgStore, Ref, fun (MsgIds, _ActionTaken) -> Pid ! {on_disk, MsgIds} - end, undefined)}. + end, undefined, <<"/">>)}. msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( @@ -548,14 +548,14 @@ test_msg_store_confirm_timer() -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), - MSCState = rabbit_msg_store:client_init( + MSCState = rabbit_msg_store_vhost_sup:client_init( ?PERSISTENT_MSG_STORE, Ref, fun (MsgIds, _ActionTaken) -> case gb_sets:is_member(MsgId, MsgIds) of true -> Self ! on_disk; false -> ok end - end, undefined), + end, undefined, <<"/">>), ok = msg_store_write([MsgId], MSCState), ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), @@ -1424,7 +1424,7 @@ nop(_) -> ok. nop(_, _) -> ok. msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). + rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>). variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( @@ -1842,7 +1842,7 @@ log_management(Config) -> ?MODULE, log_management1, [Config]). log_management1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), Suffix = ".0", ok = test_logs_working([LogFile]), @@ -1917,7 +1917,7 @@ log_management_during_startup(Config) -> ?MODULE, log_management_during_startup1, [Config]). log_management_during_startup1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), Suffix = ".0", %% start application with simple tty logging @@ -2002,7 +2002,7 @@ externally_rotated_logs_are_automatically_reopened(Config) -> ?MODULE, externally_rotated_logs_are_automatically_reopened1, [Config]). externally_rotated_logs_are_automatically_reopened1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), %% Make sure log file is opened ok = test_logs_working([LogFile]), |
