diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-07-31 19:16:53 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-07-31 19:16:53 +0300 |
| commit | 843199f62259bea5a27793cd2ded68804f30c1a6 (patch) | |
| tree | c41dd04b84e13eea7705e0004a9ccef5b2db09b1 | |
| parent | 98566cdd9c22f556c732b8c91059ca44f5a5ed34 (diff) | |
| parent | 1df9b98f35fddf5d6af47d79fbd015980b71cf8e (diff) | |
| download | rabbitmq-server-git-843199f62259bea5a27793cd2ded68804f30c1a6.tar.gz | |
Merge pull request #1309 from rabbitmq/rabbitmq-server-1303
Set queue state to 'stopped' when terminating.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 1 | ||||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 33 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 39 |
6 files changed, 63 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ff57593374..5537634144 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,6 +40,7 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). -export([pid_of/1, pid_of/2]). +-export([mark_local_durable_queues_stopped/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -255,6 +256,15 @@ start(Qs) -> [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], ok. +mark_local_durable_queues_stopped(VHost) -> + Qs = find_durable_queues(VHost), + rabbit_misc:execute_mnesia_transaction( + fun() -> + [ store_queue(Q#amqqueue{ state = stopped }) + || Q = #amqqueue{ state = State } <- Qs, + State =/= stopped ] + end). + find_durable_queues(VHost) -> Node = node(), mnesia:async_dirty( @@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) -> E({absent, Q, timeout}); {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + {ok, Q = #amqqueue{state = stopped}} -> + %% The queue process was stopped by the supervisor + E({absent, Q, stopped}); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); +info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); +info(Q = #amqqueue{ state = stopped }, Items) -> + info_down(Q, Items, stopped); info(#amqqueue{ pid = QPid }, Items) -> case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4e43104de2..678f1136c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) -> rabbit_core_metrics:queue_deleted(qname(State)), - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); + terminate_shutdown( + fun (BQS) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + [Q] = mnesia:read({rabbit_queue, QName}), + Q2 = Q#amqqueue{state = stopped}, + rabbit_amqqueue:store_queue(Q2) + end), + BQ:terminate(R, BQS) + end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 00a6607dfb..c69a27d57c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin, fun (not_found) -> {ok, 0}; ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), {ok, 0}; + ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username), + {ok, 0}; ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index e3c815a727..f6e4a83daa 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -55,6 +55,7 @@ init([VHost]) -> timer:send_interval(Interval, check_vhost), {ok, VHost} catch _:Reason -> + rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" " Stacktrace ~p", [VHost, Reason, erlang:get_stacktrace()]), diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index 535725d585..71b74ea104 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -31,6 +31,7 @@ -export([list_queues_local/1 ,list_queues_offline/1 ,list_queues_online/1 + ,list_queues_stopped/1 ]). all() -> @@ -44,6 +45,7 @@ groups() -> [list_queues_local ,list_queues_online ,list_queues_offline + ,list_queues_stopped ]} ]. @@ -96,13 +98,19 @@ end_per_group(list_queues, Config0) -> rabbit_ct_helpers:run_steps(Config1, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()); -end_per_group(global_parameters, Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()); end_per_group(_, Config) -> Config. +init_per_testcase(list_queues_stopped, Config0) -> + %% Start node 3 to crash it's queues + rabbit_ct_broker_helpers:start_node(Config0, 2), + %% Make vhost "down" on nodes 2 and 3 + rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>), + rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>), + + rabbit_ct_broker_helpers:stop_node(Config0, 2), + rabbit_ct_helpers:testcase_started(Config0, list_queues_stopped); + init_per_testcase(Testcase, Config0) -> rabbit_ct_helpers:testcase_started(Config0, Testcase). @@ -134,6 +142,23 @@ list_queues_offline(Config) -> assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues), ok. +list_queues_stopped(Config) -> + Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))), + Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))), + Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))), + + %% All queues are listed + ListedQueues = + [ {Name, State} + || [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "state"]) ], + + [ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ], + %% Node is running. Vhost is down + [ <<"stopped">> = proplists:get_value(Q, ListedQueues) || Q <- Node2Queues ], + %% Node is not running. Vhost is down + [ <<"down">> = proplists:get_value(Q, ListedQueues) || Q <- Node3Queues ]. + %%---------------------------------------------------------------------------- %% Helpers %%---------------------------------------------------------------------------- diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index a519d01af5..65aef8d410 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -161,7 +161,7 @@ vhost_failure_forces_connection_closure(Config) -> [_Conn2] = open_connections(Config, [{0, VHost2}]), ?assertEqual(1, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2), timer:sleep(200), ?assertEqual(0, count_connections_in(Config, VHost2)), @@ -181,7 +181,7 @@ dead_vhost_connection_refused(Config) -> ?assertEqual(0, count_connections_in(Config, VHost1)), ?assertEqual(0, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2), timer:sleep(200), [_Conn1] = open_connections(Config, [{0, VHost1}]), @@ -213,7 +213,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) -> [_Conn21] = open_connections(Config, [{1, VHost2}]), ?assertEqual(2, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, 0, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2), timer:sleep(200), %% Vhost2 connection on node 1 is still alive ?assertEqual(1, count_connections_in(Config, VHost2)), @@ -236,7 +236,7 @@ dead_vhost_connection_refused_on_failure_node(Config) -> ?assertEqual(0, count_connections_in(Config, VHost1)), ?assertEqual(0, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, 0, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2), timer:sleep(200), %% Can open connections to vhost1 on node 0 and 1 [_Conn10] = open_connections(Config, [{0, VHost1}]), @@ -257,37 +257,6 @@ dead_vhost_connection_refused_on_failure_node(Config) -> rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). -force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost). - -force_vhost_failure(Config, Node, VHost) -> - force_vhost_failure(Config, Node, VHost, 10). - -force_vhost_failure(_Config, _Node, VHost, 0) -> - error({failed_to_force_vhost_failure, no_more_attempts_left, VHost}); -force_vhost_failure(Config, Node, VHost, Attempts) -> - MessageStorePid = get_message_store_pid(Config, VHost), - rabbit_ct_broker_helpers:rpc(Config, Node, - erlang, exit, - [MessageStorePid, force_vhost_failure]), - %% Give it a time to fail - timer:sleep(200), - case rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_vhost_sup_sup, is_vhost_alive, - [VHost]) of - true -> force_vhost_failure(Config, Node, VHost, Attempts - 1); - false -> ok - end. - -get_message_store_pid(Config, VHost) -> - {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_vhost_sup_sup, vhost_sup, [VHost]), - Children = rabbit_ct_broker_helpers:rpc(Config, 0, - supervisor, which_children, - [VHostSup]), - [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children, - Name == msg_store_persistent], - MsgStorePid. - cluster_vhost_deletion_forces_connection_closure(Config) -> VHost1 = <<"vhost1">>, VHost2 = <<"vhost2">>, |
