summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-07-31 21:42:03 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-07-31 21:42:03 +0300
commit5f549efa29403a3e4e351c363e53aa900de320dc (patch)
tree13ccb4948b73e2a4160059e8988f0ff500c8d3cb /src
parent5bcf9ab6ebcfaa5383965710f0e2cb7ebd8cba66 (diff)
parent843199f62259bea5a27793cd2ded68804f30c1a6 (diff)
downloadrabbitmq-server-git-5f549efa29403a3e4e351c363e53aa900de320dc.tar.gz
Merge branch 'master' into rabbitmq-server-1310
Conflicts: test/vhost_SUITE.erl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_vhost_process.erl1
4 files changed, 30 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ff57593374..5537634144 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,6 +40,7 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
-export([pid_of/1, pid_of/2]).
+-export([mark_local_durable_queues_stopped/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -255,6 +256,15 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.
+mark_local_durable_queues_stopped(VHost) ->
+ Qs = find_durable_queues(VHost),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [ store_queue(Q#amqqueue{ state = stopped })
+ || Q = #amqqueue{ state = State } <- Qs,
+ State =/= stopped ]
+ end).
+
find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
@@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) ->
E({absent, Q, timeout});
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ {ok, Q = #amqqueue{state = stopped}} ->
+ %% The queue process was stopped by the supervisor
+ E({absent, Q, stopped});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
+info(Q = #amqqueue{ state = stopped }, Items) ->
+ info_down(Q, Items, stopped);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e43104de2..678f1136c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+ terminate_shutdown(
+ fun (BQS) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ Q2 = Q#amqqueue{state = stopped},
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQ:terminate(R, BQS)
+ end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 00a6607dfb..c69a27d57c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
fun (not_found) -> {ok, 0};
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
+ ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index e3c815a727..f6e4a83daa 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -55,6 +55,7 @@ init([VHost]) ->
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
+ rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),