summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-07-31 19:16:53 +0300
committerGitHub <noreply@github.com>2017-07-31 19:16:53 +0300
commit843199f62259bea5a27793cd2ded68804f30c1a6 (patch)
treec41dd04b84e13eea7705e0004a9ccef5b2db09b1
parent98566cdd9c22f556c732b8c91059ca44f5a5ed34 (diff)
parent1df9b98f35fddf5d6af47d79fbd015980b71cf8e (diff)
downloadrabbitmq-server-git-843199f62259bea5a27793cd2ded68804f30c1a6.tar.gz
Merge pull request #1309 from rabbitmq/rabbitmq-server-1303
Set queue state to 'stopped' when terminating.
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_vhost_process.erl1
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl33
-rw-r--r--test/vhost_SUITE.erl39
6 files changed, 63 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ff57593374..5537634144 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,6 +40,7 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
-export([pid_of/1, pid_of/2]).
+-export([mark_local_durable_queues_stopped/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -255,6 +256,15 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.
+mark_local_durable_queues_stopped(VHost) ->
+ Qs = find_durable_queues(VHost),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [ store_queue(Q#amqqueue{ state = stopped })
+ || Q = #amqqueue{ state = State } <- Qs,
+ State =/= stopped ]
+ end).
+
find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
@@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) ->
E({absent, Q, timeout});
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ {ok, Q = #amqqueue{state = stopped}} ->
+ %% The queue process was stopped by the supervisor
+ E({absent, Q, stopped});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
+info(Q = #amqqueue{ state = stopped }, Items) ->
+ info_down(Q, Items, stopped);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e43104de2..678f1136c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+ terminate_shutdown(
+ fun (BQS) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ Q2 = Q#amqqueue{state = stopped},
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQ:terminate(R, BQS)
+ end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 00a6607dfb..c69a27d57c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
fun (not_found) -> {ok, 0};
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
+ ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index e3c815a727..f6e4a83daa 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -55,6 +55,7 @@ init([VHost]) ->
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
+ rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 535725d585..71b74ea104 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -31,6 +31,7 @@
-export([list_queues_local/1
,list_queues_offline/1
,list_queues_online/1
+ ,list_queues_stopped/1
]).
all() ->
@@ -44,6 +45,7 @@ groups() ->
[list_queues_local
,list_queues_online
,list_queues_offline
+ ,list_queues_stopped
]}
].
@@ -96,13 +98,19 @@ end_per_group(list_queues, Config0) ->
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
-end_per_group(global_parameters, Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps());
end_per_group(_, Config) ->
Config.
+init_per_testcase(list_queues_stopped, Config0) ->
+ %% Start node 3 to crash it's queues
+ rabbit_ct_broker_helpers:start_node(Config0, 2),
+ %% Make vhost "down" on nodes 2 and 3
+ rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>),
+
+ rabbit_ct_broker_helpers:stop_node(Config0, 2),
+ rabbit_ct_helpers:testcase_started(Config0, list_queues_stopped);
+
init_per_testcase(Testcase, Config0) ->
rabbit_ct_helpers:testcase_started(Config0, Testcase).
@@ -134,6 +142,23 @@ list_queues_offline(Config) ->
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
ok.
+list_queues_stopped(Config) ->
+ Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))),
+ Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))),
+ Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))),
+
+ %% All queues are listed
+ ListedQueues =
+ [ {Name, State}
+ || [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "state"]) ],
+
+ [ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ],
+ %% Node is running. Vhost is down
+ [ <<"stopped">> = proplists:get_value(Q, ListedQueues) || Q <- Node2Queues ],
+ %% Node is not running. Vhost is down
+ [ <<"down">> = proplists:get_value(Q, ListedQueues) || Q <- Node3Queues ].
+
%%----------------------------------------------------------------------------
%% Helpers
%%----------------------------------------------------------------------------
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index a519d01af5..65aef8d410 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -161,7 +161,7 @@ vhost_failure_forces_connection_closure(Config) ->
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(1, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, VHost2)),
@@ -181,7 +181,7 @@ dead_vhost_connection_refused(Config) ->
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
[_Conn1] = open_connections(Config, [{0, VHost1}]),
@@ -213,7 +213,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
[_Conn21] = open_connections(Config, [{1, VHost2}]),
?assertEqual(2, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, 0, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Vhost2 connection on node 1 is still alive
?assertEqual(1, count_connections_in(Config, VHost2)),
@@ -236,7 +236,7 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, 0, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Can open connections to vhost1 on node 0 and 1
[_Conn10] = open_connections(Config, [{0, VHost1}]),
@@ -257,37 +257,6 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
-force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost).
-
-force_vhost_failure(Config, Node, VHost) ->
- force_vhost_failure(Config, Node, VHost, 10).
-
-force_vhost_failure(_Config, _Node, VHost, 0) ->
- error({failed_to_force_vhost_failure, no_more_attempts_left, VHost});
-force_vhost_failure(Config, Node, VHost, Attempts) ->
- MessageStorePid = get_message_store_pid(Config, VHost),
- rabbit_ct_broker_helpers:rpc(Config, Node,
- erlang, exit,
- [MessageStorePid, force_vhost_failure]),
- %% Give it a time to fail
- timer:sleep(200),
- case rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_vhost_sup_sup, is_vhost_alive,
- [VHost]) of
- true -> force_vhost_failure(Config, Node, VHost, Attempts - 1);
- false -> ok
- end.
-
-get_message_store_pid(Config, VHost) ->
- {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_vhost_sup_sup, vhost_sup, [VHost]),
- Children = rabbit_ct_broker_helpers:rpc(Config, 0,
- supervisor, which_children,
- [VHostSup]),
- [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children,
- Name == msg_store_persistent],
- MsgStorePid.
-
cluster_vhost_deletion_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,