summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-05-09 19:20:34 +0300
committerGitHub <noreply@github.com>2017-05-09 19:20:34 +0300
commitd1eecf2f4ba0f49899e44cb490e620df84990149 (patch)
treed6852481b240e8d7b9cfb29bda35fb7c24a25fc4 /test
parent2ee4ef2eab559ea1341019e514e9db3eacc99ba0 (diff)
parent4cdad4b2a2a29355a1cb02d8798c9c6eaf4c0b67 (diff)
downloadrabbitmq-server-git-d1eecf2f4ba0f49899e44cb490e620df84990149.tar.gz
Merge pull request #1158 from rabbitmq/rabbitmq-server-1146-full
Per-vhost supervisors.
Diffstat (limited to 'test')
-rw-r--r--test/backing_queue_SUITE.erl67
-rw-r--r--test/channel_operation_timeout_test_queue.erl108
-rw-r--r--test/crashing_queues_SUITE.erl12
3 files changed, 92 insertions, 95 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index e17b2afbf3..3ff215f497 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -21,10 +21,11 @@
-compile(export_all).
--define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
--define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(TIMEOUT, 30000).
+-define(VHOST, <<"/">>).
-define(VARIABLE_QUEUE_TESTCASES, [
variable_queue_dynamic_duration_change,
@@ -253,9 +254,9 @@ msg_store1(_Config) ->
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
%% stop and restart, preserving every other msg in 2nd half
- ok = rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start_msg_store(
- #{}, {fun ([]) -> finished;
+ ok = rabbit_variable_queue:stop_msg_store(?VHOST),
+ ok = rabbit_variable_queue:start_msg_store(?VHOST,
+ [], {fun ([]) -> finished;
([MsgId|MsgIdsTail])
when length(MsgIdsTail) rem 2 == 0 ->
{MsgId, 1, MsgIdsTail};
@@ -330,8 +331,8 @@ msg_store1(_Config) ->
passed.
restart_msg_store_empty() ->
- ok = rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start_msg_store(
+ ok = rabbit_variable_queue:stop_msg_store(?VHOST),
+ ok = rabbit_variable_queue:start_msg_store(?VHOST,
undefined, {fun (ok) -> finished end, ok}).
msg_id_bin(X) ->
@@ -376,10 +377,10 @@ on_disk_stop(Pid) ->
msg_store_client_init_capture(MsgStore, Ref) ->
Pid = spawn(fun on_disk_capture/0),
- {Pid, rabbit_msg_store_vhost_sup:client_init(
- MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
- Pid ! {on_disk, MsgIds}
- end, undefined, <<"/">>)}.
+ {Pid, rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref,
+ fun (MsgIds, _ActionTaken) ->
+ Pid ! {on_disk, MsgIds}
+ end, undefined)}.
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
@@ -456,14 +457,16 @@ test_msg_store_confirm_timer() ->
Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
- MSCState = rabbit_msg_store_vhost_sup:client_init(
- ?PERSISTENT_MSG_STORE, Ref,
- fun (MsgIds, _ActionTaken) ->
- case gb_sets:is_member(MsgId, MsgIds) of
- true -> Self ! on_disk;
- false -> ok
- end
- end, undefined, <<"/">>),
+ MSCState = rabbit_vhost_msg_store:client_init(
+ ?VHOST,
+ ?PERSISTENT_MSG_STORE,
+ Ref,
+ fun (MsgIds, _ActionTaken) ->
+ case gb_sets:is_member(MsgId, MsgIds) of
+ true -> Self ! on_disk;
+ false -> ok
+ end
+ end, undefined),
ok = msg_store_write([MsgId], MSCState),
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
ok = msg_store_remove([MsgId], MSCState),
@@ -651,8 +654,8 @@ bq_queue_index1(_Config) ->
Qi8
end),
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([]),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, []),
passed.
@@ -672,8 +675,8 @@ bq_queue_index_props1(_Config) ->
Qi2
end),
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([]),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, []),
passed.
@@ -718,7 +721,7 @@ bq_queue_recover1(Config) ->
true, false, [], none, <<"acting-user">>),
publish_and_confirm(Q, <<>>, Count),
- SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid),
+ SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q),
true = is_pid(SupPid),
exit(SupPid, kill),
exit(QPid, kill),
@@ -726,8 +729,8 @@ bq_queue_recover1(Config) ->
receive {'DOWN', MRef, process, QPid, _Info} -> ok
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
- rabbit_amqqueue:stop(),
- rabbit_amqqueue:start(rabbit_amqqueue:recover()),
+ rabbit_amqqueue:stop(?VHOST),
+ rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
@@ -1275,14 +1278,14 @@ init_test_queue(QName) ->
Res.
restart_test_queue(Qi, QName) ->
- _ = rabbit_queue_index:terminate([], Qi),
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([QName]),
+ _ = rabbit_queue_index:terminate(?VHOST, [], Qi),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, [QName]),
init_test_queue(QName).
empty_test_queue(QName) ->
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([]),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, []),
{0, 0, Qi} = init_test_queue(QName),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
@@ -1337,7 +1340,7 @@ nop(_) -> ok.
nop(_, _) -> ok.
msg_store_client_init(MsgStore, Ref) ->
- rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
+ rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined, undefined).
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 10d97726f4..09d7ab4e95 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -28,10 +28,10 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
--export([start/1, stop/0]).
+-export([start/2, stop/1]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([start_msg_store/3, stop_msg_store/1, init/6]).
%%----------------------------------------------------------------------------
%% This test backing queue follows the variable queue implementation, with
@@ -87,7 +87,8 @@
io_batch_size,
%% default queue or lazy queue
- mode
+ mode,
+ virtual_host
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -111,10 +112,11 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
--define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
+-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -184,7 +186,8 @@
disk_write_count :: non_neg_integer(),
io_batch_size :: pos_integer(),
- mode :: 'default' | 'lazy' }.
+ mode :: 'default' | 'lazy',
+ virtual_host :: rabbit_types:vhost() }.
%% Duplicated from rabbit_backing_queue
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
@@ -213,55 +216,39 @@
%% Public API
%%----------------------------------------------------------------------------
-start(DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
+start(VHost, DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
%% Group recovery terms by vhost.
- {[], VhostRefs} = lists:foldl(
- fun
- %% We need to skip a queue name
- (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
- {QNames, VhostRefs};
- (Terms, {[QueueName | QNames], VhostRefs}) ->
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {QNames, VhostRefs};
- Ref ->
- #resource{virtual_host = VHost} = QueueName,
- Refs = case maps:find(VHost, VhostRefs) of
- {ok, Val} -> Val;
- error -> []
- end,
- {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
- end
- end,
- {DurableQueues, #{}},
- AllTerms),
- start_msg_store(VhostRefs, StartFunState),
+ ClientRefs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ start_msg_store(VHost, ClientRefs, StartFunState),
{ok, AllTerms}.
-stop() ->
- ok = stop_msg_store(),
- ok = rabbit_queue_index:stop().
-
-start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
- undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- Refs, StartFunState]),
- %% Start message store for all known vhosts
- VHosts = rabbit_vhost:list(),
- lists:foreach(
- fun(VHost) ->
- rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
- rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
- end,
- VHosts),
+stop(VHost) ->
+ ok = stop_msg_store(VHost),
+ ok = rabbit_queue_index:stop(VHost).
+
+start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
+ rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?TRANSIENT_MSG_STORE,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?PERSISTENT_MSG_STORE,
+ Refs,
+ StartFunState),
+ rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
+
+stop_msg_store(VHost) ->
+ rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
+ rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
ok.
-stop_msg_store() ->
- ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
- ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(Queue, Recover, Callback) ->
init(
@@ -284,7 +271,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost);
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
@@ -309,10 +296,10 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
+ rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
- PersistentClient, TransientClient).
+ PersistentClient, TransientClient, VHost).
process_recovery_terms(Terms=non_clean_shutdown) ->
{rabbit_guid:gen(), Terms};
@@ -326,7 +313,8 @@ terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
persistent_bytes = PBytes,
index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} } =
+ msg_store_clients = {MSCStateP, MSCStateT},
+ virtual_host = VHost } =
purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
@@ -338,7 +326,7 @@ terminate(_Reason, State) ->
{persistent_count, PCount},
{persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
- Terms, IndexState),
+ VHost, Terms, IndexState),
msg_store_clients = undefined }).
%% the only difference between purge and delete is that delete also
@@ -990,10 +978,9 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store_vhost_sup:client_init(
+ rabbit_vhost_msg_store:client_init(VHost,
MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end,
- VHost).
+ fun () -> Callback(?MODULE, CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -1084,7 +1071,7 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
- PersistentClient, TransientClient) ->
+ PersistentClient, TransientClient, VHost) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{DeltaCount1, DeltaBytes1} =
@@ -1148,7 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
- mode = default },
+ mode = default,
+ virtual_host = VHost },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
index 457a4110fa..6e78c1579f 100644
--- a/test/crashing_queues_SUITE.erl
+++ b/test/crashing_queues_SUITE.erl
@@ -218,12 +218,18 @@ kill_queue(Node, QName) ->
queue_pid(Node, QName) ->
#amqqueue{pid = QPid,
- state = State} = lookup(Node, QName),
+ state = State,
+ name = #resource{virtual_host = VHost}} = lookup(Node, QName),
case State of
- crashed -> case sup_child(Node, rabbit_amqqueue_sup_sup) of
+ crashed ->
+ case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
+ {error, {queue_supervisor_not_found, Result}} -> {error, no_sup};
+ {ok, SPid} ->
+ case sup_child(Node, SPid) of
{ok, _} -> QPid; %% restarting
{error, no_child} -> crashed %% given up
- end;
+ end
+ end;
_ -> QPid
end.