summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-06-02 15:13:39 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-10-20 14:49:52 +0100
commitfdd7b871df020cd138623bd8bffab7077440cbd5 (patch)
tree4360dbb9d1197a213d4c46b19e3d0a79fa59a01d
parent460abf24db53b0137271cb24db41ec520fabc75b (diff)
downloadrabbitmq-server-git-fdd7b871df020cd138623bd8bffab7077440cbd5.tar.gz
Updated tests to support per-vhost message store
-rw-r--r--test/channel_operation_timeout_test_queue.erl42
-rw-r--r--test/unit_inbroker_SUITE.erl10
2 files changed, 34 insertions, 18 deletions
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 3a284c592b..07b1235672 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -230,12 +230,21 @@ stop() ->
ok = rabbit_queue_index:stop().
start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- Refs, StartFunState]).
+ Refs, StartFunState]),
+ %% Start message store for all known vhosts
+ VHosts = rabbit_vhost:list(),
+ lists:foreach(
+ fun(VHost) ->
+ rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
+ rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
+ end,
+ VHosts),
+ ok.
stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
@@ -254,22 +263,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback);
+ MsgOnDiskFun, AsyncCallback,
+ VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback),
+ MsgOnDiskFun, AsyncCallback,
+ VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
@@ -278,11 +291,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback),
+ undefined, AsyncCallback,
+ VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -957,14 +971,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback).
+ Callback, VHost).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end).
+ rabbit_msg_store_vhost_sup:client_init(
+ MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end,
+ VHost).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index a82887d8df..569c7a88fa 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -468,10 +468,10 @@ on_disk_stop(Pid) ->
msg_store_client_init_capture(MsgStore, Ref) ->
Pid = spawn(fun on_disk_capture/0),
- {Pid, rabbit_msg_store:client_init(
+ {Pid, rabbit_msg_store_vhost_sup:client_init(
MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
Pid ! {on_disk, MsgIds}
- end, undefined)}.
+ end, undefined, <<"/">>)}.
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
@@ -548,14 +548,14 @@ test_msg_store_confirm_timer() ->
Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
- MSCState = rabbit_msg_store:client_init(
+ MSCState = rabbit_msg_store_vhost_sup:client_init(
?PERSISTENT_MSG_STORE, Ref,
fun (MsgIds, _ActionTaken) ->
case gb_sets:is_member(MsgId, MsgIds) of
true -> Self ! on_disk;
false -> ok
end
- end, undefined),
+ end, undefined, <<"/">>),
ok = msg_store_write([MsgId], MSCState),
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
ok = msg_store_remove([MsgId], MSCState),
@@ -1424,7 +1424,7 @@ nop(_) -> ok.
nop(_, _) -> ok.
msg_store_client_init(MsgStore, Ref) ->
- rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
+ rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(