diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-06-02 15:13:39 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-10-20 14:49:52 +0100 |
| commit | fdd7b871df020cd138623bd8bffab7077440cbd5 (patch) | |
| tree | 4360dbb9d1197a213d4c46b19e3d0a79fa59a01d | |
| parent | 460abf24db53b0137271cb24db41ec520fabc75b (diff) | |
| download | rabbitmq-server-git-fdd7b871df020cd138623bd8bffab7077440cbd5.tar.gz | |
Updated tests to support per-vhost message store
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 42 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 10 |
2 files changed, 34 insertions, 18 deletions
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 3a284c592b..07b1235672 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -230,12 +230,21 @@ stop() -> ok = rabbit_queue_index:stop(). start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]). + Refs, StartFunState]), + %% Start message store for all known vhosts + VHosts = rabbit_vhost:list(), + lists:foreach( + fun(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) + end, + VHosts), + ok. stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), @@ -254,22 +263,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback); + MsgOnDiskFun, AsyncCallback, + VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), + VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + MsgOnDiskFun, AsyncCallback, + VHost), {C, fun (MsgId) when is_binary(MsgId) -> rabbit_msg_store:contains(MsgId, C); (#basic_message{is_persistent = Persistent}) -> @@ -278,11 +291,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback), + undefined, AsyncCallback, + VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -957,14 +971,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback). + Callback, VHost). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). + rabbit_msg_store_vhost_sup:client_init( + MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end, + VHost). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index a82887d8df..569c7a88fa 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -468,10 +468,10 @@ on_disk_stop(Pid) -> msg_store_client_init_capture(MsgStore, Ref) -> Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store:client_init( + {Pid, rabbit_msg_store_vhost_sup:client_init( MsgStore, Ref, fun (MsgIds, _ActionTaken) -> Pid ! {on_disk, MsgIds} - end, undefined)}. + end, undefined, <<"/">>)}. msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( @@ -548,14 +548,14 @@ test_msg_store_confirm_timer() -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), - MSCState = rabbit_msg_store:client_init( + MSCState = rabbit_msg_store_vhost_sup:client_init( ?PERSISTENT_MSG_STORE, Ref, fun (MsgIds, _ActionTaken) -> case gb_sets:is_member(MsgId, MsgIds) of true -> Self ! on_disk; false -> ok end - end, undefined), + end, undefined, <<"/">>), ok = msg_store_write([MsgId], MSCState), ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), @@ -1424,7 +1424,7 @@ nop(_) -> ok. nop(_, _) -> ok. msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). + rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>). variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( |
