diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-05-09 19:20:34 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-05-09 19:20:34 +0300 |
| commit | d1eecf2f4ba0f49899e44cb490e620df84990149 (patch) | |
| tree | d6852481b240e8d7b9cfb29bda35fb7c24a25fc4 /test | |
| parent | 2ee4ef2eab559ea1341019e514e9db3eacc99ba0 (diff) | |
| parent | 4cdad4b2a2a29355a1cb02d8798c9c6eaf4c0b67 (diff) | |
| download | rabbitmq-server-git-d1eecf2f4ba0f49899e44cb490e620df84990149.tar.gz | |
Merge pull request #1158 from rabbitmq/rabbitmq-server-1146-full
Per-vhost supervisors.
Diffstat (limited to 'test')
| -rw-r--r-- | test/backing_queue_SUITE.erl | 67 | ||||
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 108 | ||||
| -rw-r--r-- | test/crashing_queues_SUITE.erl | 12 |
3 files changed, 92 insertions, 95 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index e17b2afbf3..3ff215f497 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -21,10 +21,11 @@ -compile(export_all). --define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). --define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). -define(TIMEOUT, 30000). +-define(VHOST, <<"/">>). -define(VARIABLE_QUEUE_TESTCASES, [ variable_queue_dynamic_duration_change, @@ -253,9 +254,9 @@ msg_store1(_Config) -> MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half - ok = rabbit_variable_queue:stop_msg_store(), - ok = rabbit_variable_queue:start_msg_store( - #{}, {fun ([]) -> finished; + ok = rabbit_variable_queue:stop_msg_store(?VHOST), + ok = rabbit_variable_queue:start_msg_store(?VHOST, + [], {fun ([]) -> finished; ([MsgId|MsgIdsTail]) when length(MsgIdsTail) rem 2 == 0 -> {MsgId, 1, MsgIdsTail}; @@ -330,8 +331,8 @@ msg_store1(_Config) -> passed. restart_msg_store_empty() -> - ok = rabbit_variable_queue:stop_msg_store(), - ok = rabbit_variable_queue:start_msg_store( + ok = rabbit_variable_queue:stop_msg_store(?VHOST), + ok = rabbit_variable_queue:start_msg_store(?VHOST, undefined, {fun (ok) -> finished end, ok}). msg_id_bin(X) -> @@ -376,10 +377,10 @@ on_disk_stop(Pid) -> msg_store_client_init_capture(MsgStore, Ref) -> Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store_vhost_sup:client_init( - MsgStore, Ref, fun (MsgIds, _ActionTaken) -> - Pid ! {on_disk, MsgIds} - end, undefined, <<"/">>)}. + {Pid, rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, + fun (MsgIds, _ActionTaken) -> + Pid ! {on_disk, MsgIds} + end, undefined)}. msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( @@ -456,14 +457,16 @@ test_msg_store_confirm_timer() -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), - MSCState = rabbit_msg_store_vhost_sup:client_init( - ?PERSISTENT_MSG_STORE, Ref, - fun (MsgIds, _ActionTaken) -> - case gb_sets:is_member(MsgId, MsgIds) of - true -> Self ! on_disk; - false -> ok - end - end, undefined, <<"/">>), + MSCState = rabbit_vhost_msg_store:client_init( + ?VHOST, + ?PERSISTENT_MSG_STORE, + Ref, + fun (MsgIds, _ActionTaken) -> + case gb_sets:is_member(MsgId, MsgIds) of + true -> Self ! on_disk; + false -> ok + end + end, undefined), ok = msg_store_write([MsgId], MSCState), ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), @@ -651,8 +654,8 @@ bq_queue_index1(_Config) -> Qi8 end), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), passed. @@ -672,8 +675,8 @@ bq_queue_index_props1(_Config) -> Qi2 end), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), passed. @@ -718,7 +721,7 @@ bq_queue_recover1(Config) -> true, false, [], none, <<"acting-user">>), publish_and_confirm(Q, <<>>, Count), - SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid), + SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q), true = is_pid(SupPid), exit(SupPid, kill), exit(QPid, kill), @@ -726,8 +729,8 @@ bq_queue_recover1(Config) -> receive {'DOWN', MRef, process, QPid, _Info} -> ok after 10000 -> exit(timeout_waiting_for_queue_death) end, - rabbit_amqqueue:stop(), - rabbit_amqqueue:start(rabbit_amqqueue:recover()), + rabbit_amqqueue:stop(?VHOST), + rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, @@ -1275,14 +1278,14 @@ init_test_queue(QName) -> Res. restart_test_queue(Qi, QName) -> - _ = rabbit_queue_index:terminate([], Qi), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([QName]), + _ = rabbit_queue_index:terminate(?VHOST, [], Qi), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, [QName]), init_test_queue(QName). empty_test_queue(QName) -> - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), {0, 0, Qi} = init_test_queue(QName), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. @@ -1337,7 +1340,7 @@ nop(_) -> ok. nop(_, _) -> ok. msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>). + rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined, undefined). variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 10d97726f4..09d7ab4e95 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -28,10 +28,10 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, multiple_routing_keys/0]). --export([start/1, stop/0]). +-export([start/2, stop/1]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/3, stop_msg_store/1, init/6]). %%---------------------------------------------------------------------------- %% This test backing queue follows the variable queue implementation, with @@ -87,7 +87,8 @@ io_batch_size, %% default queue or lazy queue - mode + mode, + virtual_host }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -111,10 +112,11 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). --define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). -define(QUEUE, lqueue). -define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). +-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -184,7 +186,8 @@ disk_write_count :: non_neg_integer(), io_batch_size :: pos_integer(), - mode :: 'default' | 'lazy' }. + mode :: 'default' | 'lazy', + virtual_host :: rabbit_types:vhost() }. %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. @@ -213,55 +216,39 @@ %% Public API %%---------------------------------------------------------------------------- -start(DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), +start(VHost, DurableQueues) -> + {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), %% Group recovery terms by vhost. - {[], VhostRefs} = lists:foldl( - fun - %% We need to skip a queue name - (non_clean_shutdown, {[_|QNames], VhostRefs}) -> - {QNames, VhostRefs}; - (Terms, {[QueueName | QNames], VhostRefs}) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {QNames, VhostRefs}; - Ref -> - #resource{virtual_host = VHost} = QueueName, - Refs = case maps:find(VHost, VhostRefs) of - {ok, Val} -> Val; - error -> [] - end, - {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} - end - end, - {DurableQueues, #{}}, - AllTerms), - start_msg_store(VhostRefs, StartFunState), + ClientRefs = [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + start_msg_store(VHost, ClientRefs, StartFunState), {ok, AllTerms}. -stop() -> - ok = stop_msg_store(), - ok = rabbit_queue_index:stop(). - -start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), - undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]), - %% Start message store for all known vhosts - VHosts = rabbit_vhost:list(), - lists:foreach( - fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) - end, - VHosts), +stop(VHost) -> + ok = stop_msg_store(VHost), + ok = rabbit_queue_index:stop(VHost). + +start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> + rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?TRANSIENT_MSG_STORE, + undefined, + ?EMPTY_START_FUN_STATE), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?PERSISTENT_MSG_STORE, + Refs, + StartFunState), + rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). + +stop_msg_store(VHost) -> + rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), + rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), ok. -stop_msg_store() -> - ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), - ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(Queue, Recover, Callback) -> init( @@ -284,7 +271,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, @@ -309,10 +296,10 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), + rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient). + PersistentClient, TransientClient, VHost). process_recovery_terms(Terms=non_clean_shutdown) -> {rabbit_guid:gen(), Terms}; @@ -326,7 +313,8 @@ terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, persistent_bytes = PBytes, index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} } = + msg_store_clients = {MSCStateP, MSCStateT}, + virtual_host = VHost } = purge_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; @@ -338,7 +326,7 @@ terminate(_Reason, State) -> {persistent_count, PCount}, {persistent_bytes, PBytes}], a(State1 #vqstate { index_state = rabbit_queue_index:terminate( - Terms, IndexState), + VHost, Terms, IndexState), msg_store_clients = undefined }). %% the only difference between purge and delete is that delete also @@ -990,10 +978,9 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store_vhost_sup:client_init( + rabbit_vhost_msg_store:client_init(VHost, MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end, - VHost). + fun () -> Callback(?MODULE, CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -1084,7 +1071,7 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient) -> + PersistentClient, TransientClient, VHost) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {DeltaCount1, DeltaBytes1} = @@ -1148,7 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, - mode = default }, + mode = default, + virtual_host = VHost }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl index 457a4110fa..6e78c1579f 100644 --- a/test/crashing_queues_SUITE.erl +++ b/test/crashing_queues_SUITE.erl @@ -218,12 +218,18 @@ kill_queue(Node, QName) -> queue_pid(Node, QName) -> #amqqueue{pid = QPid, - state = State} = lookup(Node, QName), + state = State, + name = #resource{virtual_host = VHost}} = lookup(Node, QName), case State of - crashed -> case sup_child(Node, rabbit_amqqueue_sup_sup) of + crashed -> + case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of + {error, {queue_supervisor_not_found, Result}} -> {error, no_sup}; + {ok, SPid} -> + case sup_child(Node, SPid) of {ok, _} -> QPid; %% restarting {error, no_child} -> crashed %% given up - end; + end + end; _ -> QPid end. |
